diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 5d1e6741c5e22..2f2fd4ef453f6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -158,9 +158,7 @@ public ClusterState applyStartedShards(ClusterState clusterState, List(), + new BalancingSummary.ClusterShardAssignments(0L, 0L, 0L, 0L, 0L, 0L), + new HashMap<>() + ); + + /** + * A concurrency-safe list of balancing round summaries. Balancer rounds are run and added here serially, so the queue will naturally + * progress from newer to older results. + */ + private ConcurrentLinkedQueue summaries = new ConcurrentLinkedQueue<>(); + + /** + * Returns a combined summary of all unreported allocation round summaries: may summarize a single balancer round, multiple, or none. + * + * @return returns {@link #EMPTY_RESULTS} if there are no unreported balancing rounds. + */ + public BalancingSummary.CombinedClusterBalancingRoundSummary combineSummaries() { + // TODO: implement + return EMPTY_RESULTS; + } + + public void addBalancerRoundSummary(BalancingSummary.BalancingRoundSummary summary) { + summaries.add(summary); + } + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundStats.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundStats.java new file mode 100644 index 0000000000000..6c6abda9099ee --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundStats.java @@ -0,0 +1,95 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +/** + * Data structure to pass allocation statistics between the desired balance classes. + */ +public record BalancingRoundStats( + long startTime, + long endTime, + long newlyAssignedShards, + long unassignedShards, + long totalAllocations, + long undesiredAllocationsExcludingShuttingDownNodes, + boolean executedReconciliation +) { + + public static final BalancingRoundStats EMPTY_BALANCING_ROUND_STATS = new BalancingRoundStats(-1, -1, -1, -1, -1, -1, false); + + public static class Builder { + private long startTime = 0; + private long endTime = 0; + private long newlyAssignedShards = 0; + private long unassignedShards = 0; + long totalAllocations = 0; + long undesiredAllocationsExcludingShuttingDownNodes = 0; + boolean executedReconciliation = false; + + public BalancingRoundStats build() { + return new BalancingRoundStats( + startTime, + endTime, + newlyAssignedShards, + unassignedShards, + totalAllocations, + undesiredAllocationsExcludingShuttingDownNodes, + + executedReconciliation + ); + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public void setEndTime(long endTime) { + this.endTime = endTime; + } + + public void incNewlyAssignedShards() { + ++this.newlyAssignedShards; + } + + public void setUnassignedShards(long numUnassignedShards) { + this.unassignedShards = numUnassignedShards; + } + + public void incTotalAllocations() { + ++this.totalAllocations; + } + + public void incUndesiredAllocationsExcludingShuttingDownNodes() { + ++this.undesiredAllocationsExcludingShuttingDownNodes; + } + + public long getStartTime() { + return this.startTime; + } + + public long getEndTime() { + return this.endTime; + } + + public long getTotalAllocations() { + return this.totalAllocations; + } + + public long getUndesiredAllocationsExcludingShuttingDownNodes() { + return this.undesiredAllocationsExcludingShuttingDownNodes; + } + + public void setExecutedReconciliation() { + this.executedReconciliation = executedReconciliation; + } + + }; + +}; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingSummary.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingSummary.java new file mode 100644 index 0000000000000..e5d6c7ae24590 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingSummary.java @@ -0,0 +1,150 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation.allocator; + +import org.elasticsearch.core.Tuple; + +import java.util.List; +import java.util.Map; + +/** + * Data structures defining the results of allocation balancing rounds. + */ +public class BalancingSummary { + + /** + * Holds combined {@link BalancingRoundSummary} results. Essentially holds a list of the balancing events and the summed up changes + * across all those events: what allocation work was done across some period of time. + * + * @param events A list of all the cluster events that started the balancing rounds and time duration for computation + reconciliation + * of each event. + * @param shardAssignments The sum of all shard movements across all combined balancing rounds. + * @param nodeChanges The total change stats per node in the cluster from the earliest balancing round to the latest one. + */ + record CombinedClusterBalancingRoundSummary( + List> events, + ClusterShardAssignments shardAssignments, + Map nodeChanges + ) {}; + + /** + * Summarizes the impact to the cluster as a result of a rebalancing round. + * + * @param eventStartTime Time at which the desired balance calculation began due to a cluster event. + * @param eventEndTime Time at which the new desired balance calculation was finished. + * @param event Reports what provoked the rebalancing round. The rebalancer only runs when requested, not on a periodic basis. + * @param computationFinishReason Whether the balancing round converged to a final allocation, or exiting early for some reason. + * @param shardMovements Lists the total number of shard moves, and breaks down the total into number shards moved by category, + * like node shutdown + * @param nodeChanges A Map of node name to {@link IndividualNodeRebalancingChangeStats} to describe what each node gained and how much + * work each node performed for the balancing round. + */ + record BalancingRoundSummary( + long eventStartTime, + long eventEndTime, + ClusterRebalancingEvent event, + DesiredBalance.ComputationFinishReason computationFinishReason, + ClusterShardAssignments shardMovements, + Map nodeChanges + ) { + @Override + public String toString() { + return "BalancingRoundSummary{" + + "ClusterRebalancingEvent=" + + event + + ", ClusterShardMovements=" + + shardMovements + + ", NodeChangeStats={" + + nodeChanges + + "}" + + '}'; + } + }; + + /** + * General cost-benefit information on the node-level. Describes how each node was improved by a balancing round, and how much work that + * node did to achieve the shard rebalancing. + * + * @param nodeWeightBeforeRebalancing + * @param nodeWeightAfterRebalancing + * @param dataMovedToNodeInMB + * @param dataMovedAwayFromNodeInMB + */ + record IndividualNodeRebalancingChangeStats( + float nodeWeightBeforeRebalancing, + float nodeWeightAfterRebalancing, + long dataMovedToNodeInMB, + long dataMovedAwayFromNodeInMB + ) { + @Override + public String toString() { + return "IndividualNodeRebalancingChangeStats{" + + "nodeWeightBeforeRebalancing=" + + nodeWeightBeforeRebalancing + + ", nodeWeightAfterRebalancing=" + + nodeWeightAfterRebalancing + + ", dataMovedToNodeInMB=" + + dataMovedToNodeInMB + + ", dataMovedAwayFromNodeInMB=" + + dataMovedAwayFromNodeInMB + + '}'; + } + }; + + /** + * Tracks and summarizes the more granular reasons why shards are moved between nodes. + * + * @param numShardsMoved total number of shard moves between nodes + * @param numAllocationDeciderForcedShardMoves total number of shards that must be moved because they violate an AllocationDecider rule + * @param numRebalancingShardMoves total number of shards moved to improve cluster balance and are not otherwise required to move + * @param numShutdownForcedShardMoves total number of shards that must move off of a node because it is shutting down + * @param numNewlyAssignedShardsNotMoved + * @param numStuckShards total number of shards violating an AllocationDecider on their current node and on every other cluster node + */ + public record ClusterShardAssignments( + long numShardsMoved, + long numAllocationDeciderForcedShardMoves, + long numRebalancingShardMoves, + long numShutdownForcedShardMoves, + long numNewlyAssignedShardsNotMoved, + long numStuckShards + ) { + @Override + public String toString() { + return "ClusterShardMovements{" + + "numShardsMoved=" + + numShardsMoved + + ", numAllocationDeciderForcedShardMoves=" + + numAllocationDeciderForcedShardMoves + + ", numRebalancingShardMoves=" + + numRebalancingShardMoves + + ", numShutdownForcedShardMoves=" + + numShutdownForcedShardMoves + + ", numStuckShards=" + + numStuckShards + + '}'; + } + }; + + /** + * The cluster event that initiated a rebalancing round. This will tell us what initiated the balancer doing some amount of rebalancing + * work. + */ + enum ClusterRebalancingEvent { + // TODO (Dianna): go through the reroute methods and identify the causes -- many reroute methods accept a 'reason' string -- and + // replace them with this enum to be saved later in a balancing summary. + RerouteCommand, + IndexCreation, + IndexDeletion, + NodeShutdownAndRemoval, + NewNodeAdded + } + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java index 0745c45b2183a..daa3666af4525 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java @@ -19,6 +19,10 @@ /** * The desired balance of the cluster, indicating which nodes should hold a copy of each shard. * + * @param lastConvergedIndex Identifies what input data the balancer computation round used to produce this {@link DesiredBalance}. See + * {@link DesiredBalanceInput#index()} for details. Each reroute request gets assigned a strictly increasing + * sequence number, and the balancer, which runs async to reroute, uses the latest request's data to compute the + * desired balance. * @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated * @param weightsPerNode The node weights calculated based on * {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#calculateNodeWeight} @@ -31,8 +35,11 @@ public record DesiredBalance( ) { enum ComputationFinishReason { + /** Computation ran to completion */ CONVERGED, + /** Computation exited and published early because a new cluster event occurred that affects computation */ YIELD_TO_NEW_INPUT, + /** Computation stopped and published early to avoid delaying new shard assignment */ STOP_EARLY } @@ -44,6 +51,7 @@ public DesiredBalance(long lastConvergedIndex, Map ass * The placeholder value for {@link DesiredBalance} when the node stands down as master. */ public static final DesiredBalance NOT_MASTER = new DesiredBalance(-2, Map.of()); + /** * The starting value for {@link DesiredBalance} when the node becomes the master. */ @@ -57,6 +65,10 @@ public static boolean hasChanges(DesiredBalance a, DesiredBalance b) { return Objects.equals(a.assignments, b.assignments) == false; } + /** + * Returns the sum of shard movements needed to reach the new desired balance. Doesn't count new shard copies as a move, nor removal or + * unassignment of a shard copy. + */ public static int shardMovements(DesiredBalance old, DesiredBalance updated) { var intersection = Sets.intersection(old.assignments().keySet(), updated.assignments().keySet()); int movements = 0; @@ -70,8 +82,15 @@ public static int shardMovements(DesiredBalance old, DesiredBalance updated) { return movements; } + /** + * Returns the number of shard movements needed to reach the new shard assignment. Doesn't count new shard copies as a move, nor removal + * or unassignment of a shard copy. + */ private static int shardMovements(ShardAssignment old, ShardAssignment updated) { - var movements = Math.min(0, old.assigned() - updated.assigned());// compensate newly started shards + // A shard move should retain the same number of assigned nodes, just swap out one node for another. We will compensate for newly + // started shards -- adding a shard copy is not a move -- by initializing the count with a negative value so that incrementing later + // for a new node zeros out. + var movements = Math.min(0, old.assigned() - updated.assigned()); for (String nodeId : updated.nodeIds()) { if (old.nodeIds().contains(nodeId) == false) { movements++; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java index 3b22221ea7db4..03630c284fa30 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java @@ -415,11 +415,14 @@ boolean hasEnoughIterations(int currentIteration) { } private static Map collectShardAssignments(RoutingNodes routingNodes) { - final var entries = routingNodes.getAssignedShards().entrySet(); - assert entries.stream().flatMap(t -> t.getValue().stream()).allMatch(ShardRouting::started) : routingNodes; - final Map res = Maps.newHashMapWithExpectedSize(entries.size()); - for (var shardAndAssignments : entries) { - res.put(shardAndAssignments.getKey(), ShardAssignment.ofAssignedShards(shardAndAssignments.getValue())); + final var allAssignedShards = routingNodes.getAssignedShards().entrySet(); + assert allAssignedShards.stream().flatMap(t -> t.getValue().stream()).allMatch(ShardRouting::started) : routingNodes; + final Map res = Maps.newHashMapWithExpectedSize(allAssignedShards.size()); + for (var shardIdAndShardRoutings : allAssignedShards) { + res.put( + shardIdAndShardRoutings.getKey(), + ShardAssignment.createFromAssignedShardRoutingsList(shardIdAndShardRoutings.getValue()) + ); } return res; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceInput.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceInput.java index 3b3d893bc6848..3089fa2f9d602 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceInput.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceInput.java @@ -23,13 +23,19 @@ * @param routingAllocation a copy of (the immutable parts of) the context for the allocation decision process * @param ignoredShards a list of the shards for which earlier allocators have claimed responsibility */ -public record DesiredBalanceInput(long index, RoutingAllocation routingAllocation, List ignoredShards) { +public record DesiredBalanceInput( + long index, + RoutingAllocation routingAllocation, + List ignoredShards, + BalancingRoundStats.Builder clusterAllocationStatsBuilder +) { public static DesiredBalanceInput create(long index, RoutingAllocation routingAllocation) { return new DesiredBalanceInput( index, routingAllocation.immutableClone(), - List.copyOf(routingAllocation.routingNodes().unassigned().ignored()) + List.copyOf(routingAllocation.routingNodes().unassigned().ignored()), + new BalancingRoundStats.Builder() ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java index fddf9267cdbb1..57d3b845a7226 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java @@ -28,12 +28,8 @@ */ public class DesiredBalanceMetrics { - public record AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {} - public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {} - public static final DesiredBalanceMetrics NOOP = new DesiredBalanceMetrics(MeterRegistry.NOOP); - public static final String UNASSIGNED_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.unassigned.current"; public static final String TOTAL_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.current"; public static final String UNDESIRED_ALLOCATION_COUNT_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.current"; @@ -56,8 +52,6 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME = "es.allocator.allocations.node.forecasted_disk_usage_bytes.current"; - public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, -1, -1); - private volatile boolean nodeIsMaster = false; /** * Number of unassigned shards during last reconciliation @@ -80,16 +74,16 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w ); public void updateMetrics( - AllocationStats allocationStats, + BalancingRoundStats balancingRoundStats, Map weightStatsPerNode, Map nodeAllocationStats ) { - assert allocationStats != null : "allocation stats cannot be null"; + assert balancingRoundStats != null : "allocation stats cannot be null"; assert weightStatsPerNode != null : "node balance weight stats cannot be null"; - if (allocationStats != EMPTY_ALLOCATION_STATS) { - this.unassignedShards = allocationStats.unassignedShards; - this.totalAllocations = allocationStats.totalAllocations; - this.undesiredAllocations = allocationStats.undesiredAllocationsExcludingShuttingDownNodes; + if (balancingRoundStats.executedReconciliation()) { + this.unassignedShards = balancingRoundStats.unassignedShards(); + this.totalAllocations = balancingRoundStats.totalAllocations(); + this.undesiredAllocations = balancingRoundStats.undesiredAllocationsExcludingShuttingDownNodes(); } weightStatsPerNodeRef.set(weightStatsPerNode); allocationStatsPerNodeRef.set(nodeAllocationStats); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java index 83b370c1a7928..bc10c85b6d8ca 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java @@ -20,10 +20,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator; -import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; -import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics.AllocationStats; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; @@ -36,9 +33,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Comparator; -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -54,6 +49,10 @@ public class DesiredBalanceReconciler { private static final Logger logger = LogManager.getLogger(DesiredBalanceReconciler.class); + /** + * The minimum interval that log messages will be written if the number of undesired shard allocations reaches the percentage of total + * shards set by {@link #UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING}. + */ public static final Setting UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING = Setting.timeSetting( "cluster.routing.allocation.desired_balance.undesired_allocations.log_interval", TimeValue.timeValueHours(1), @@ -62,6 +61,10 @@ public class DesiredBalanceReconciler { Setting.Property.NodeScope ); + /** + * Warning log messages may be periodically written if the number of shards that are on undesired nodes reaches this percentage setting. + * Works together with {@link #UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING} to log on a periodic basis. + */ public static final Setting UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING = Setting.doubleSetting( "cluster.routing.allocation.desired_balance.undesired_allocations.threshold", 0.1, @@ -74,16 +77,8 @@ public class DesiredBalanceReconciler { private double undesiredAllocationsLogThreshold; private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering(); private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering(); - private final DesiredBalanceMetrics desiredBalanceMetrics; - private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator; - - public DesiredBalanceReconciler( - ClusterSettings clusterSettings, - ThreadPool threadPool, - DesiredBalanceMetrics desiredBalanceMetrics, - NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator - ) { - this.desiredBalanceMetrics = desiredBalanceMetrics; + + public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) { this.undesiredAllocationLogInterval = new FrequencyCappedAction( threadPool.relativeTimeInMillisSupplier(), TimeValue.timeValueMinutes(5) @@ -93,14 +88,20 @@ public DesiredBalanceReconciler( UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING, value -> this.undesiredAllocationsLogThreshold = value ); - this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator; } - public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { + /** + * Applies a desired shard allocation to the routing table by initializing and relocating shards in the cluster state. + * + * @param desiredBalance The new desired cluster shard allocation + * @param allocation Cluster state information with which to make decisions, contains routing table metadata that will be modified to + * reach the given desired balance. + */ + public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation, BalancingRoundStats.Builder statsBuilder) { var nodeIds = allocation.routingNodes().getAllNodeIds(); allocationOrdering.retainNodes(nodeIds); moveOrdering.retainNodes(nodeIds); - new Reconciliation(desiredBalance, allocation).run(); + new Reconciliation(desiredBalance, allocation).run(statsBuilder); } public void clear() { @@ -120,7 +121,7 @@ private class Reconciliation { this.routingNodes = allocation.routingNodes(); } - void run() { + void run(BalancingRoundStats.Builder statsBuilder) { try (var ignored = allocation.withReconcilingFlag()) { logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex()); @@ -129,52 +130,30 @@ void run() { // no data nodes, so fail allocation to report red health failAllocationOfNewPrimaries(allocation); logger.trace("no nodes available, nothing to reconcile"); - return; } if (desiredBalance.assignments().isEmpty()) { // no desired state yet but it is on its way and we'll reroute again when it is ready logger.trace("desired balance is empty, nothing to reconcile"); - return; } + statsBuilder.setExecutedReconciliation(); // compute next moves towards current desired balance: // 1. allocate unassigned shards first logger.trace("Reconciler#allocateUnassigned"); - allocateUnassigned(); + allocateUnassigned(statsBuilder); assert allocateUnassignedInvariant(); // 2. move any shards that cannot remain where they are logger.trace("Reconciler#moveShards"); - moveShards(); + moveShards(statsBuilder); // 3. move any other shards that are desired elsewhere logger.trace("Reconciler#balance"); - var allocationStats = balance(); + balance(statsBuilder); logger.debug("Reconciliation is complete"); - - updateDesireBalanceMetrics(allocationStats); - } - } - - private void updateDesireBalanceMetrics(AllocationStats allocationStats) { - var nodesStatsAndWeights = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights( - allocation.metadata(), - allocation.routingNodes(), - allocation.clusterInfo(), - desiredBalance - ); - Map filteredNodeAllocationStatsAndWeights = new HashMap<>( - nodesStatsAndWeights.size() - ); - for (var nodeStatsAndWeight : nodesStatsAndWeights.entrySet()) { - var node = allocation.nodes().get(nodeStatsAndWeight.getKey()); - if (node != null) { - filteredNodeAllocationStatsAndWeights.put(node, nodeStatsAndWeight.getValue()); - } } - desiredBalanceMetrics.updateMetrics(allocationStats, desiredBalance.weightsPerNode(), filteredNodeAllocationStatsAndWeights); } private boolean allocateUnassignedInvariant() { @@ -238,7 +217,7 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { } } - private void allocateUnassigned() { + private void allocateUnassigned(BalancingRoundStats.Builder statsBuilder) { RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); if (logger.isTraceEnabled()) { logger.trace("Start allocating unassigned shards: {}", routingNodes.toString()); @@ -312,6 +291,7 @@ private void allocateUnassigned() { logger.debug("Assigning shard [{}] to {} [{}]", shard, nodeIdsIterator.source, nodeId); long shardSize = getExpectedShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, allocation); routingNodes.initializeShard(shard, nodeId, null, shardSize, allocation.changes()); + statsBuilder.incNewlyAssignedShards(); allocationOrdering.recordAllocation(nodeId); if (shard.primary() == false) { // copy over the same replica shards to the secondary array so they will get allocated @@ -446,7 +426,7 @@ private boolean isIgnored(RoutingNodes routingNodes, ShardRouting shard, ShardAs return assignment.total() - assignment.ignored() <= assigned; } - private void moveShards() { + private void moveShards(BalancingRoundStats.Builder statsBuilder) { // Iterate over all started shards and check if they can remain. In the presence of throttling shard movements, // the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the shards. for (final var iterator = OrderedShardsIterator.createForNecessaryMoves(allocation, moveOrdering); iterator.hasNext();) { @@ -469,6 +449,8 @@ private void moveShards() { } if (allocation.deciders().canAllocate(shardRouting, allocation).type() != Decision.Type.YES) { + // TODO (Dianna): are these 'stuck' shards that cannot be allocated anyplace due to restrictions? + // cannot allocate anywhere, no point in looking for a target node continue; } @@ -497,14 +479,12 @@ private void moveShards() { } } - private AllocationStats balance() { + private void balance(BalancingRoundStats.Builder statsBuilder) { if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) { - return DesiredBalanceMetrics.EMPTY_ALLOCATION_STATS; + return; } - int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size(); - int totalAllocations = 0; - int undesiredAllocationsExcludingShuttingDownNodes = 0; + statsBuilder.setUnassignedShards(routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size()); // Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard // movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the @@ -512,7 +492,7 @@ private AllocationStats balance() { for (final var iterator = OrderedShardsIterator.createForBalancing(allocation, moveOrdering); iterator.hasNext();) { final var shardRouting = iterator.next(); - totalAllocations++; + statsBuilder.incTotalAllocations(); if (shardRouting.started() == false) { // can only rebalance started shards @@ -531,7 +511,7 @@ private AllocationStats balance() { } if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) { - undesiredAllocationsExcludingShuttingDownNodes++; + statsBuilder.incUndesiredAllocationsExcludingShuttingDownNodes(); } if (allocation.deciders().canRebalance(shardRouting, allocation).type() != Decision.Type.YES) { @@ -565,12 +545,14 @@ private AllocationStats balance() { } } - maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size()); - - return new AllocationStats(unassignedShards, totalAllocations, undesiredAllocationsExcludingShuttingDownNodes); + maybeLogUndesiredAllocationsWarning( + statsBuilder.getTotalAllocations(), + statsBuilder.getUndesiredAllocationsExcludingShuttingDownNodes(), + routingNodes.size() + ); } - private void maybeLogUndesiredAllocationsWarning(int totalAllocations, int undesiredAllocations, int nodeCount) { + private void maybeLogUndesiredAllocationsWarning(long totalAllocations, long undesiredAllocations, int nodeCount) { // more shards than cluster can relocate with one reroute final boolean nonEmptyRelocationBacklog = undesiredAllocations > 2L * nodeCount; final boolean warningThresholdReached = undesiredAllocations > undesiredAllocationsLogThreshold * totalAllocations; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java index 5be26f0b3e8c7..fc87b1d10adf6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java @@ -16,7 +16,9 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService.RerouteStrategy; import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; @@ -38,6 +40,7 @@ import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -56,18 +59,42 @@ public class DesiredBalanceShardsAllocator implements ShardsAllocator { private final ShardsAllocator delegateAllocator; private final ThreadPool threadPool; + /** + * This is a callback to run {@link AllocationService#executeWithRoutingAllocation(ClusterState, String, RerouteStrategy)}, which + * produces a new ClusterState with the changes made by {@link DesiredBalanceReconciler#reconcile}. The {@link RerouteStrategy} provided + * to the callback calls into {@link #desiredBalanceReconciler} for the changes. The {@link #masterServiceTaskQueue} will apply the + * cluster state update. + */ private final DesiredBalanceReconcilerAction reconciler; private final DesiredBalanceComputer desiredBalanceComputer; + /** + * Reconciliation ({@link DesiredBalanceReconciler#reconcile(DesiredBalance, RoutingAllocation, BalancingRoundStats.Builder)}) takes + * the {@link DesiredBalance} output of {@link DesiredBalanceComputer#compute} and identifies how shards need to be added, moved or + * removed to go from the current cluster shard allocation to the new desired allocation. + */ private final DesiredBalanceReconciler desiredBalanceReconciler; private final ContinuousComputation desiredBalanceComputation; - private final PendingListenersQueue queue; + /** + * Saves and runs listeners after DesiredBalance computations complete. + */ + private final PendingListenersQueue pendingListenersQueue; + /** + * Each reroute request gets assigned a monotonically increasing sequence number. Many reroute requests may arrive before the balancer + * asynchronously runs a computation. The balancer will use the latest request and save this sequence number to track back to the + * request. + */ private final AtomicLong indexGenerator = new AtomicLong(-1); private final ConcurrentLinkedQueue> pendingDesiredBalanceMoves = new ConcurrentLinkedQueue<>(); private final MasterServiceTaskQueue masterServiceTaskQueue; private final AtomicReference currentDesiredBalanceRef = new AtomicReference<>(DesiredBalance.NOT_MASTER); private volatile boolean resetCurrentDesiredBalance = false; private final Set processedNodeShutdowns = new HashSet<>(); + private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator; private final DesiredBalanceMetrics desiredBalanceMetrics; + /** + * Manages balancer round results in order to report metrics on the balancer activity in a configurable manner. + */ + private final AllocationBalancingRoundSummaryService balancerRoundSummaryService; // stats protected final CounterMetric computationsSubmitted = new CounterMetric(); @@ -112,16 +139,13 @@ public DesiredBalanceShardsAllocator( NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator ) { this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry()); + this.balancerRoundSummaryService = new AllocationBalancingRoundSummaryService(); + this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator; this.delegateAllocator = delegateAllocator; this.threadPool = threadPool; this.reconciler = reconciler; this.desiredBalanceComputer = desiredBalanceComputer; - this.desiredBalanceReconciler = new DesiredBalanceReconciler( - clusterService.getClusterSettings(), - threadPool, - desiredBalanceMetrics, - nodeAllocationStatsAndWeightsCalculator - ); + this.desiredBalanceReconciler = new DesiredBalanceReconciler(clusterService.getClusterSettings(), threadPool); this.desiredBalanceComputation = new ContinuousComputation<>(threadPool.generic()) { @Override @@ -137,6 +161,9 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) { return; } + BalancingRoundStats.Builder statsBuilder = new BalancingRoundStats.Builder(); + statsBuilder.setStartTime(threadPool.relativeTimeInMillis()); + recordTime( cumulativeComputationTime, // We set currentDesiredBalance back to INITIAL when the node stands down as master in onNoLongerMaster. @@ -169,13 +196,13 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) { "Desired balance computation for [{}] terminated early with partial result, scheduling reconciliation", index ); - submitReconcileTask(currentDesiredBalance); + submitReconcileTask(currentDesiredBalance, statsBuilder); var newInput = DesiredBalanceInput.create(indexGenerator.incrementAndGet(), desiredBalanceInput.routingAllocation()); desiredBalanceComputation.compareAndEnqueue(desiredBalanceInput, newInput); } else if (isFresh(desiredBalanceInput)) { logger.debug("Desired balance computation for [{}] is completed, scheduling reconciliation", index); computationsConverged.inc(); - submitReconcileTask(currentDesiredBalance); + submitReconcileTask(currentDesiredBalance, statsBuilder); } else { logger.debug("Desired balance computation for [{}] is discarded as newer one is submitted", index); } @@ -199,7 +226,7 @@ public String toString() { return "DesiredBalanceShardsAllocator#allocate"; } }; - this.queue = new PendingListenersQueue(); + this.pendingListenersQueue = new PendingListenersQueue(); this.masterServiceTaskQueue = clusterService.createTaskQueue( "reconcile-desired-balance", Priority.URGENT, @@ -235,12 +262,13 @@ public void allocate(RoutingAllocation allocation, ActionListener listener var index = indexGenerator.incrementAndGet(); logger.debug("Executing allocate for [{}]", index); - queue.add(index, listener); + pendingListenersQueue.add(index, listener); // This can only run on master, so unset not-master if exists if (currentDesiredBalanceRef.compareAndSet(DesiredBalance.NOT_MASTER, DesiredBalance.BECOME_MASTER_INITIAL)) { logger.debug("initialized desired balance for becoming master"); } - desiredBalanceComputation.onNewInput(DesiredBalanceInput.create(index, allocation)); + var desiredBalanceInput = DesiredBalanceInput.create(index, allocation); + desiredBalanceComputation.onNewInput(desiredBalanceInput); if (allocation.routingTable().indicesRouting().isEmpty()) { logger.debug("No eager reconciliation needed for empty routing table"); @@ -249,7 +277,7 @@ public void allocate(RoutingAllocation allocation, ActionListener listener // Starts reconciliation towards desired balance that might have not been updated with a recent calculation yet. // This is fine as balance should have incremental rather than radical changes. // This should speed up achieving the desired balance in cases current state is still different from it (due to THROTTLING). - reconcile(currentDesiredBalanceRef.get(), allocation); + reconcile(currentDesiredBalanceRef.get(), allocation, desiredBalanceInput.clusterAllocationStatsBuilder()); } private void processNodeShutdowns(ClusterState clusterState) { @@ -314,17 +342,80 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) { } } - protected void submitReconcileTask(DesiredBalance desiredBalance) { - masterServiceTaskQueue.submitTask("reconcile-desired-balance", new ReconcileDesiredBalanceTask(desiredBalance), null); + /** + * Submits the desired balance to be reconciled (applies the desired changes to the routing table) and creates and publishes a new + * cluster state. The data nodes will receive and apply the new cluster state to start/move/remove shards. + */ + protected void submitReconcileTask(DesiredBalance desiredBalance, BalancingRoundStats.Builder statsBuilder) { + masterServiceTaskQueue.submitTask("reconcile-desired-balance", new ReconcileDesiredBalanceTask(desiredBalance, statsBuilder), null); } - protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { + protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation, BalancingRoundStats.Builder statsBuilder) { if (logger.isTraceEnabled()) { logger.trace("Reconciling desired balance: {}", desiredBalance); } else { logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex()); } - recordTime(cumulativeReconciliationTime, () -> desiredBalanceReconciler.reconcile(desiredBalance, allocation)); + recordTime(cumulativeReconciliationTime, () -> { + var nodesStatsAndWeightsBeforeReconciliationChanges = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights( + allocation.metadata(), + allocation.routingNodes(), + allocation.clusterInfo(), + desiredBalance + ); + + desiredBalanceReconciler.reconcile(desiredBalance, allocation, statsBuilder); + + // TODO (Dianna): dedupe changes and filtering with updateDesireBalanceMetrics work + var nodesStatsAndWeightsAfterReconciliationChanges = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights( + allocation.metadata(), + allocation.routingNodes(), + allocation.clusterInfo(), + desiredBalance + ); + Map nodeChanges = new HashMap<>(); + for (var nodeStatsAndWeightBefore : nodesStatsAndWeightsBeforeReconciliationChanges.entrySet()) { + var node = allocation.nodes().get(nodeStatsAndWeightBefore.getKey()); + var nodeStatsAndWeightAfter = nodesStatsAndWeightsAfterReconciliationChanges.get(nodeStatsAndWeightBefore.getKey()); + // TODO (Dianna): is this node!=null check even necessary? + if (node != null && nodeStatsAndWeightAfter != null) { + var nodeChange = new BalancingSummary.IndividualNodeRebalancingChangeStats( + nodeStatsAndWeightBefore.getValue().currentNodeWeight(), + nodeStatsAndWeightAfter.currentNodeWeight(), + // TODO (Dianna): dataMovedTo/AwayFromNodeInMB + 0, + 0 + ); + nodeChanges.put(nodeStatsAndWeightBefore.getKey(), nodeChange); + } + } + + statsBuilder.setEndTime(threadPool.relativeTimeInMillis()); + balancerRoundSummaryService.addBalancerRoundSummary( + new BalancingSummary.BalancingRoundSummary( + statsBuilder.getStartTime(), + statsBuilder.getEndTime(), + // TODO: non dummy value + BalancingSummary.ClusterRebalancingEvent.RerouteCommand, + desiredBalance.finishReason(), + new BalancingSummary.ClusterShardAssignments( + 1, //// TODO numShardsMoved + 1, //// TODO numAllocationDeciderForcedShardMoves + 1, //// TODO numRebalancingShardMoves + 1, //// TODO numShutdownForcedShardMoves + 1, //// TODO numNewlyAssignedShardsNotMoved + //// TODO numStuckShards - the current val is wrong. + statsBuilder.getUndesiredAllocationsExcludingShuttingDownNodes() + ), + nodeChanges + ) + ); + + // TODO (Dianna): revisit how I want to use ClusterAllocationStats.Builder - I don't really want to call build, but I do want to + // hang things off of it to grab later + updateDesireBalanceMetrics(desiredBalance, allocation, statsBuilder.build()); + }); + if (logger.isTraceEnabled()) { logger.trace("Reconciled desired balance: {}", desiredBalance); } else { @@ -332,7 +423,7 @@ protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation alloca } } - private RerouteStrategy createReconcileAllocationAction(DesiredBalance desiredBalance) { + private RerouteStrategy createReconcileAllocationAction(DesiredBalance desiredBalance, BalancingRoundStats.Builder statsBuilder) { return new RerouteStrategy() { @Override public void removeDelayMarkers(RoutingAllocation allocation) { @@ -345,7 +436,7 @@ public void removeDelayMarkers(RoutingAllocation allocation) { @Override public void execute(RoutingAllocation allocation) { - reconcile(desiredBalance, allocation); + reconcile(desiredBalance, allocation, statsBuilder); } }; } @@ -358,6 +449,32 @@ public void resetDesiredBalance() { resetCurrentDesiredBalance = true; } + private void updateDesireBalanceMetrics( + DesiredBalance desiredBalance, + RoutingAllocation routingAllocation, + BalancingRoundStats balancingRoundStats + ) { + if (balancingRoundStats == BalancingRoundStats.EMPTY_BALANCING_ROUND_STATS) { + return; + } + + var nodesStatsAndWeights = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights( + routingAllocation.metadata(), + routingAllocation.routingNodes(), + routingAllocation.clusterInfo(), + desiredBalance + ); + Map filteredNodeAllocationStatsAndWeights = + new HashMap<>(nodesStatsAndWeights.size()); + for (var nodeStatsAndWeight : nodesStatsAndWeights.entrySet()) { + var node = routingAllocation.nodes().get(nodeStatsAndWeight.getKey()); + if (node != null) { + filteredNodeAllocationStatsAndWeights.put(node, nodeStatsAndWeight.getValue()); + } + } + desiredBalanceMetrics.updateMetrics(balancingRoundStats, desiredBalance.weightsPerNode(), filteredNodeAllocationStatsAndWeights); + } + public DesiredBalanceStats getStats() { return new DesiredBalanceStats( Math.max(currentDesiredBalanceRef.get().lastConvergedIndex(), 0L), @@ -378,7 +495,7 @@ public DesiredBalanceStats getStats() { private void onNoLongerMaster() { if (indexGenerator.getAndSet(-1) != -1) { currentDesiredBalanceRef.set(DesiredBalance.NOT_MASTER); - queue.completeAllAsNotMaster(); + pendingListenersQueue.completeAllAsNotMaster(); pendingDesiredBalanceMoves.clear(); desiredBalanceReconciler.clear(); desiredBalanceMetrics.zeroAllMetrics(); @@ -387,9 +504,11 @@ private void onNoLongerMaster() { private static final class ReconcileDesiredBalanceTask implements ClusterStateTaskListener { private final DesiredBalance desiredBalance; + private final BalancingRoundStats.Builder statsBuilder; - private ReconcileDesiredBalanceTask(DesiredBalance desiredBalance) { + private ReconcileDesiredBalanceTask(DesiredBalance desiredBalance, BalancingRoundStats.Builder statsBuilder) { this.desiredBalance = desiredBalance; + this.statsBuilder = statsBuilder; } @Override @@ -426,9 +545,9 @@ private ClusterState applyBalance( try (var ignored = batchExecutionContext.dropHeadersContext()) { var newState = reconciler.apply( batchExecutionContext.initialState(), - createReconcileAllocationAction(latest.getTask().desiredBalance) + createReconcileAllocationAction(latest.getTask().desiredBalance, latest.getTask().statsBuilder) ); - latest.success(() -> queue.complete(latest.getTask().desiredBalance.lastConvergedIndex())); + latest.success(() -> pendingListenersQueue.complete(latest.getTask().desiredBalance.lastConvergedIndex())); return newState; } } @@ -447,7 +566,7 @@ private static void discardSupersededTasks( // only for tests - in production, this happens after reconciliation protected final void completeToLastConvergedIndex() { - queue.complete(currentDesiredBalanceRef.get().lastConvergedIndex()); + pendingListenersQueue.complete(currentDesiredBalanceRef.get().lastConvergedIndex()); } private void recordTime(CounterMetric metric, Runnable action) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/PendingListenersQueue.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/PendingListenersQueue.java index e1b58cf79ac09..5b14277f2c651 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/PendingListenersQueue.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/PendingListenersQueue.java @@ -24,6 +24,10 @@ import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME; import static org.elasticsearch.cluster.service.MasterService.MASTER_UPDATE_THREAD_NAME; +/** + * Registers listeners with an `index` number ({@link #add(long, ActionListener)}) and then completes them whenever the latest index number + * is greater or equal to a listener's index value ({@link #complete(long)}). + */ public class PendingListenersQueue { private static final Logger logger = LogManager.getLogger(PendingListenersQueue.class); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardAssignment.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardAssignment.java index 4fb9137cb4544..2bd1b9bb2bb64 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardAssignment.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardAssignment.java @@ -17,6 +17,14 @@ import static java.util.Collections.unmodifiableSet; +/** + * Simple shard assignment summary of shard copies for a particular index shard. + * + * @param nodeIds The node IDs of nodes holding a shard copy. + * @param total The total number of shard copies. + * @param unassigned The number of unassigned shard copies. + * @param ignored The number of ignored shard copies. + */ public record ShardAssignment(Set nodeIds, int total, int unassigned, int ignored) { public ShardAssignment { @@ -28,9 +36,13 @@ public int assigned() { return nodeIds.size(); } - public static ShardAssignment ofAssignedShards(List routings) { + /** + * Helper method to instantiate a new ShardAssignment from a given list of ShardRouting instances. Assumes all shards are assigned. + */ + public static ShardAssignment createFromAssignedShardRoutingsList(List routings) { var nodeIds = new LinkedHashSet(); for (ShardRouting routing : routings) { + assert routing.unassignedInfo() == null : "Expected assigned shard copies only, unassigned info: " + routing.unassignedInfo(); nodeIds.add(routing.currentNodeId()); } return new ShardAssignment(unmodifiableSet(nodeIds), routings.size(), 0, 0); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java index 679d04224aefe..262115240487d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java @@ -685,7 +685,12 @@ public void testDesiredBalanceShouldConvergeInABigCluster() { var settings = Settings.EMPTY; - var input = new DesiredBalanceInput(randomInt(), routingAllocationWithDecidersOf(clusterState, clusterInfo, settings), List.of()); + var input = new DesiredBalanceInput( + randomInt(), + routingAllocationWithDecidersOf(clusterState, clusterInfo, settings), + List.of(), + new BalancingRoundStats.Builder() + ); var desiredBalance = createDesiredBalanceComputer(new BalancedShardsAllocator(settings)).compute( DesiredBalance.BECOME_MASTER_INITIAL, input, @@ -834,7 +839,12 @@ public void testComputeConsideringShardSizes() { var desiredBalance = createDesiredBalanceComputer(new BalancedShardsAllocator(settings)).compute( initial, - new DesiredBalanceInput(randomInt(), routingAllocationWithDecidersOf(clusterState, clusterInfo, settings), List.of()), + new DesiredBalanceInput( + randomInt(), + routingAllocationWithDecidersOf(clusterState, clusterInfo, settings), + List.of(), + new BalancingRoundStats.Builder() + ), queue(), input -> true ); @@ -994,7 +1004,7 @@ public void testAccountForSizeOfMisplacedShardsDuringNewComputation() { ); var nextDesiredBalance = createDesiredBalanceComputer(new BalancedShardsAllocator()).compute( initialDesiredBalance, - new DesiredBalanceInput(2, allocation, List.of()), + new DesiredBalanceInput(2, allocation, List.of(), new BalancingRoundStats.Builder()), queue(), input -> true ); @@ -1120,7 +1130,7 @@ public void testAccountForSizeOfAllInitializingShardsDuringAllocation() { ); var nextDesiredBalance = createDesiredBalanceComputer(new BalancedShardsAllocator()).compute( initialDesiredBalance, - new DesiredBalanceInput(2, allocation, List.of()), + new DesiredBalanceInput(2, allocation, List.of(), new BalancingRoundStats.Builder()), queue(), input -> true ); @@ -1379,7 +1389,7 @@ private static void assertDesiredAssignments(DesiredBalance desiredBalance, Map< } private static DesiredBalanceInput createInput(ClusterState clusterState, ShardRouting... ignored) { - return new DesiredBalanceInput(randomInt(), routingAllocationOf(clusterState), List.of(ignored)); + return new DesiredBalanceInput(randomInt(), routingAllocationOf(clusterState), List.of(ignored), new BalancingRoundStats.Builder()); } private static RoutingAllocation routingAllocationOf(ClusterState clusterState) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java index 9e6e080f38216..842604ebe76e0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java @@ -9,7 +9,6 @@ package org.elasticsearch.cluster.routing.allocation.allocator; -import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics.AllocationStats; import org.elasticsearch.telemetry.InstrumentType; import org.elasticsearch.telemetry.RecordingMeterRegistry; import org.elasticsearch.telemetry.metric.MeterRegistry; @@ -24,10 +23,25 @@ public class DesiredBalanceMetricsTests extends ESTestCase { public void testZeroAllMetrics() { DesiredBalanceMetrics metrics = new DesiredBalanceMetrics(MeterRegistry.NOOP); + long startTime = randomNonNegativeLong(); + long endTime = randomNonNegativeLong(); + long newlyAssignedShards = randomNonNegativeLong(); long unassignedShards = randomNonNegativeLong(); long totalAllocations = randomNonNegativeLong(); long undesiredAllocations = randomNonNegativeLong(); - metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of()); + metrics.updateMetrics( + new BalancingRoundStats( + startTime, + endTime, + newlyAssignedShards, + unassignedShards, + totalAllocations, + undesiredAllocations, + true + ), + Map.of(), + Map.of() + ); assertEquals(totalAllocations, metrics.totalAllocations()); assertEquals(unassignedShards, metrics.unassignedShards()); assertEquals(undesiredAllocations, metrics.undesiredAllocations()); @@ -41,10 +55,25 @@ public void testMetricsAreOnlyPublishedWhenNodeIsMaster() { RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); DesiredBalanceMetrics metrics = new DesiredBalanceMetrics(meterRegistry); + long startTime = randomNonNegativeLong(); + long endTime = randomNonNegativeLong(); + long newlyAssignedShards = randomNonNegativeLong(); long unassignedShards = randomNonNegativeLong(); long totalAllocations = randomLongBetween(100, 10000000); long undesiredAllocations = randomLongBetween(0, totalAllocations); - metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of()); + metrics.updateMetrics( + new BalancingRoundStats( + startTime, + endTime, + newlyAssignedShards, + unassignedShards, + totalAllocations, + undesiredAllocations, + true + ), + Map.of(), + Map.of() + ); // Collect when not master meterRegistry.getRecorder().collect(); @@ -103,8 +132,10 @@ public void testMetricsAreOnlyPublishedWhenNodeIsMaster() { public void testUndesiredAllocationRatioIsZeroWhenTotalShardsIsZero() { RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry(); DesiredBalanceMetrics metrics = new DesiredBalanceMetrics(meterRegistry); + long startTime = randomNonNegativeLong(); + long endTime = randomNonNegativeLong(); long unassignedShards = randomNonNegativeLong(); - metrics.updateMetrics(new AllocationStats(unassignedShards, 0, 0), Map.of(), Map.of()); + metrics.updateMetrics(new BalancingRoundStats(startTime, endTime, 0, unassignedShards, 0, 0, true), Map.of(), Map.of()); metrics.setNodeIsMaster(true); meterRegistry.getRecorder().collect(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index cd94c87bb4b57..89ad040b18713 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -110,11 +110,13 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase { + private BalancingRoundStats.Builder statsBuilderDummy = new BalancingRoundStats.Builder(); + public void testNoChangesOnEmptyDesiredBalance() { final var clusterState = DesiredBalanceComputerTests.createInitialClusterState(3); final var routingAllocation = createRoutingAllocationFrom(clusterState); - reconcile(routingAllocation, new DesiredBalance(1, Map.of())); + reconcile(routingAllocation, new DesiredBalance(1, Map.of()), statsBuilderDummy); assertFalse(routingAllocation.routingNodesChanged()); } @@ -186,7 +188,8 @@ public void testFailsNewPrimariesIfNoDataNodes() { new ShardId(clusterState.metadata().index(DesiredBalanceComputerTests.TEST_INDEX).getIndex(), 0), new ShardAssignment(Set.of("node-0"), 1, 0, 0) ) - ) + ), + statsBuilderDummy ); assertTrue(routingAllocation.routingNodesChanged()); @@ -236,7 +239,7 @@ public void testUnassignedPrimariesBeforeUnassignedReplicas() { ); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings), @@ -319,7 +322,7 @@ public void testUnassignedShardsInterleaving() { final var desiredBalance = desiredBalance(clusterState, (shardId, nodeId) -> true); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings) @@ -409,7 +412,7 @@ public void testUnassignedShardsPriority() { final var assignReplicas = new AtomicBoolean(false); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings), @@ -517,7 +520,7 @@ public void testUnassignedRespectsDesiredBalance() { final var desiredBalance = desiredBalance(clusterState, (shardId, nodeId) -> true); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider() ); @@ -608,7 +611,7 @@ public void testUnassignedAllocationPredictsDiskUsage() { final var desiredBalance = desiredBalance(clusterState, (shardId, nodeId) -> true); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), () -> clusterInfo, () -> snapshotShardSizeInfo, new SameShardAllocationDecider(clusterSettings), @@ -648,7 +651,7 @@ public void testUnassignedSkipsEquivalentReplicas() { final var replicaDecision = randomFrom(Decision.THROTTLE, Decision.NO); final var desiredBalance = desiredBalance(clusterState, (shardId, nodeId) -> true); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new AllocationDecider() { @@ -707,7 +710,7 @@ public void testUnassignedSetsAllocationStatusOnUnassignedShards() { final var nonYesDecision = randomFrom(Decision.THROTTLE, Decision.NO); final var desiredBalance = desiredBalance(clusterState, (shardId, nodeId) -> true); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new AllocationDecider() { @@ -765,7 +768,7 @@ public void testUnassignedPrimariesThrottlingAndFallback() { final var allocationFilter = new AtomicReference>(); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance), + routingAllocation -> reconcile(routingAllocation, desiredBalance, statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings), @@ -843,7 +846,7 @@ public void testMoveShards() { final var desiredBalance = new AtomicReference<>(desiredBalance(clusterState, (shardId, nodeId) -> true)); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance.get()), + routingAllocation -> reconcile(routingAllocation, desiredBalance.get(), statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings), @@ -964,7 +967,7 @@ public void testRebalance() { desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-0") || nodeId.equals("node-1")) ); final var allocationService = createTestAllocationService( - routingAllocation -> reconcile(routingAllocation, desiredBalance.get()), + routingAllocation -> reconcile(routingAllocation, desiredBalance.get(), statsBuilderDummy), new SameShardAllocationDecider(clusterSettings), new ReplicaAfterPrimaryActiveAllocationDecider(), new ThrottlingAllocationDecider(clusterSettings), @@ -1044,7 +1047,7 @@ public void testDoNotRebalanceToTheNodeThatNoLongerExists() { Map.of(shardId, new ShardAssignment(Set.of("node-1"), 1, 0, 0)) // shard is assigned to the node that has left ); - reconcile(allocation, balance); + reconcile(allocation, balance, statsBuilderDummy); assertThat(allocation.routingNodes().node("node-0"), notNullValue()); assertThat(allocation.routingNodes().node("node-1"), nullValue()); @@ -1070,7 +1073,7 @@ public void testDoNotAllocateIgnoredShards() { Map.of(shardId, new ShardAssignment(Set.of(), 1, 1, 1)) // shard is ignored ); - reconcile(allocation, balance); + reconcile(allocation, balance, statsBuilderDummy); assertThat(allocation.routingNodes().node("node-0").size(), equalTo(0)); assertThat(allocation.routingNodes().unassigned().ignored(), hasSize(1)); @@ -1100,7 +1103,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing final var allocation = createRoutingAllocationFrom(clusterState, initialForcedAllocationDecider); final var balance = new DesiredBalance(1, Map.of(shardId, new ShardAssignment(desiredNodeIds, 2, 0, 0))); - reconcile(allocation, balance); + reconcile(allocation, balance, statsBuilderDummy); // only primary is allocated to the fallback node, replica stays unassigned assertThat(allocation.routingNodes().node("node-0").size() + allocation.routingNodes().node("node-1").size(), equalTo(0)); @@ -1130,7 +1133,7 @@ public Optional> getForcedInitialShardAllocationToNodes(ShardRouting final var allocation = createRoutingAllocationFrom(clusterState, allocationIsNotPossibleOnDesiredNodeDesiredNode); final var balance = new DesiredBalance(1, Map.of(shardId, new ShardAssignment(Set.of("node-0"), 1, 0, 0))); - reconcile(allocation, balance); + reconcile(allocation, balance, statsBuilderDummy); assertThat(allocation.routingNodes().node("node-0").size(), equalTo(0)); assertThat(allocation.routingNodes().node("node-1").size(), equalTo(1)); @@ -1164,7 +1167,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing final var allocation = createRoutingAllocationFrom(clusterState, initialForcedAllocationDecider); final var balance = new DesiredBalance(1, Map.of(shardId, new ShardAssignment(Set.of("node-0"), 1, 0, 0))); - reconcile(allocation, balance); + reconcile(allocation, balance, statsBuilderDummy); assertThat(allocation.routingNodes().node("node-0").size(), equalTo(0)); assertThat(allocation.routingNodes().node("node-1").size(), equalTo(0)); @@ -1212,12 +1215,7 @@ public void testRebalanceDoesNotCauseHotSpots() { new ConcurrentRebalanceAllocationDecider(clusterSettings), new ThrottlingAllocationDecider(clusterSettings) }; - var reconciler = new DesiredBalanceReconciler( - clusterSettings, - new DeterministicTaskQueue().getThreadPool(), - DesiredBalanceMetrics.NOOP, - EMPTY_NODE_ALLOCATION_STATS - ); + var reconciler = new DesiredBalanceReconciler(clusterSettings, new DeterministicTaskQueue().getThreadPool()); var totalOutgoingMoves = new HashMap(); for (int i = 0; i < numberOfNodes; i++) { @@ -1230,7 +1228,7 @@ public void testRebalanceDoesNotCauseHotSpots() { while (true) { var allocation = createRoutingAllocationFrom(clusterState, deciders); - reconciler.reconcile(balance, allocation); + reconciler.reconcile(balance, allocation, statsBuilderDummy); var initializing = shardsWithState(allocation.routingNodes(), ShardRoutingState.INITIALIZING); if (initializing.isEmpty()) { @@ -1299,12 +1297,7 @@ public void testShouldLogOnTooManyUndesiredAllocations() { final var timeInMillisSupplier = new AtomicLong(); when(threadPool.relativeTimeInMillisSupplier()).thenReturn(timeInMillisSupplier::incrementAndGet); - var reconciler = new DesiredBalanceReconciler( - createBuiltInClusterSettings(), - threadPool, - DesiredBalanceMetrics.NOOP, - EMPTY_NODE_ALLOCATION_STATS - ); + var reconciler = new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool); final long initialDelayInMillis = TimeValue.timeValueMinutes(5).getMillis(); timeInMillisSupplier.addAndGet(randomLongBetween(initialDelayInMillis, 2 * initialDelayInMillis)); @@ -1315,7 +1308,11 @@ public void testShouldLogOnTooManyUndesiredAllocations() { + ") are not on their desired nodes, which exceeds the warn threshold of [10%]"; // Desired assignment matches current routing table assertThatLogger( - () -> reconciler.reconcile(new DesiredBalance(1, allShardsDesiredOnDataNode1), createRoutingAllocationFrom(clusterState)), + () -> reconciler.reconcile( + new DesiredBalance(1, allShardsDesiredOnDataNode1), + createRoutingAllocationFrom(clusterState), + statsBuilderDummy + ), DesiredBalanceReconciler.class, new MockLog.UnseenEventExpectation( "Should not log if all shards on desired location", @@ -1325,7 +1322,11 @@ public void testShouldLogOnTooManyUndesiredAllocations() { ) ); assertThatLogger( - () -> reconciler.reconcile(new DesiredBalance(1, allShardsDesiredOnDataNode2), createRoutingAllocationFrom(clusterState)), + () -> reconciler.reconcile( + new DesiredBalance(1, allShardsDesiredOnDataNode2), + createRoutingAllocationFrom(clusterState), + statsBuilderDummy + ), DesiredBalanceReconciler.class, node1ShuttingDown ? new MockLog.UnseenEventExpectation( @@ -1342,7 +1343,11 @@ public void testShouldLogOnTooManyUndesiredAllocations() { ) ); assertThatLogger( - () -> reconciler.reconcile(new DesiredBalance(1, allShardsDesiredOnDataNode2), createRoutingAllocationFrom(clusterState)), + () -> reconciler.reconcile( + new DesiredBalance(1, allShardsDesiredOnDataNode2), + createRoutingAllocationFrom(clusterState), + statsBuilderDummy + ), DesiredBalanceReconciler.class, new MockLog.UnseenEventExpectation( "Should not log immediate second too many shards on undesired locations", @@ -1353,11 +1358,14 @@ public void testShouldLogOnTooManyUndesiredAllocations() { ); } - private static void reconcile(RoutingAllocation routingAllocation, DesiredBalance desiredBalance) { + private static void reconcile( + RoutingAllocation routingAllocation, + DesiredBalance desiredBalance, + BalancingRoundStats.Builder statsBuilder + ) { final var threadPool = mock(ThreadPool.class); when(threadPool.relativeTimeInMillisSupplier()).thenReturn(new AtomicLong()::incrementAndGet); - new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool, DesiredBalanceMetrics.NOOP, EMPTY_NODE_ALLOCATION_STATS) - .reconcile(desiredBalance, routingAllocation); + new DesiredBalanceReconciler(createBuiltInClusterSettings(), threadPool).reconcile(desiredBalance, routingAllocation, statsBuilder); } private static boolean isReconciled(RoutingNode node, DesiredBalance balance) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java index 21d547c1593b8..94e0373dc8dcd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java @@ -958,12 +958,16 @@ public void allocate(RoutingAllocation allocation, ActionListener listener } @Override - protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { + protected void reconcile( + DesiredBalance desiredBalance, + RoutingAllocation allocation, + BalancingRoundStats.Builder statsBuilder + ) { fail("should not call reconcile"); } @Override - protected void submitReconcileTask(DesiredBalance desiredBalance) { + protected void submitReconcileTask(DesiredBalance desiredBalance, BalancingRoundStats.Builder statsBuilder) { assertThat(desiredBalance.lastConvergedIndex(), equalTo(0L)); reconciliationTaskSubmitted.set(true); lastListener.onResponse(null); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceTests.java index 760900817780a..2c15addfe217b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceTests.java @@ -46,19 +46,19 @@ public void testShardMovements() { ); assertThat( - "1 shard movements when existing shard is moved and new shard copy is unassigned", + "1 shard movements when an existing shard copy is moved and new shard copy is unassigned", shardMovements(new ShardAssignment(Set.of("a", "b"), 2, 0, 0), new ShardAssignment(Set.of("a", "c"), 3, 1, 0)), equalTo(1) ); assertThat( - "1 shard movement", + "1 shard movement when an existing shard copy is moved", shardMovements(new ShardAssignment(Set.of("a", "b"), 2, 0, 0), new ShardAssignment(Set.of("a", "c"), 2, 0, 0)), equalTo(1) ); assertThat( - "2 shard movement", + "2 shard movements when both shard copies are move to new nodes", shardMovements(new ShardAssignment(Set.of("a", "b"), 2, 0, 0), new ShardAssignment(Set.of("c", "d"), 2, 0, 0)), equalTo(2) ); @@ -77,10 +77,10 @@ public void testShardMovements() { } private static int shardMovements(ShardAssignment old, ShardAssignment updated) { - return DesiredBalance.shardMovements(of(old), of(updated)); + return DesiredBalance.shardMovements(createDesiredBalanceWith(old), createDesiredBalanceWith(updated)); } - private static DesiredBalance of(ShardAssignment assignment) { + private static DesiredBalance createDesiredBalanceWith(ShardAssignment assignment) { return new DesiredBalance(1, Map.of(new ShardId("index", "_na_", 0), assignment)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 132dd4b119469..5b21fc90f402c 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -29,6 +29,7 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancingRoundStats; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; @@ -183,14 +184,18 @@ public void allocate(RoutingAllocation allocation, ActionListener listener } @Override - protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { + protected void reconcile( + DesiredBalance desiredBalance, + RoutingAllocation allocation, + BalancingRoundStats.Builder statsBuilder + ) { // do nothing as balance is not computed yet (during allocate) } @Override - protected void submitReconcileTask(DesiredBalance desiredBalance) { + protected void submitReconcileTask(DesiredBalance desiredBalance, BalancingRoundStats.Builder statsBuilder) { // reconcile synchronously rather than in cluster state update task - super.reconcile(desiredBalance, lastAllocation); + super.reconcile(desiredBalance, lastAllocation, statsBuilder); } }; }