Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.allocator;

/**
* Data structure to pass allocation statistics between the desired balance classes.
*/
public record BalancingRoundStats(
long unassignedShards,
long totalAllocations,
long undesiredAllocationsExcludingShuttingDownNodes,
boolean rebalancing
) {

/**
* Tracks in-progress balancing round statistics.
*/
public static class Builder {
long unassignedShards = 0;
long totalAllocations = 0;
long undesiredAllocationsExcludingShuttingDownNodes = 0;
boolean rebalancing = false;

public BalancingRoundStats build() {
return new BalancingRoundStats(unassignedShards, totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, rebalancing);
}

public void setUnassignedShards(long numUnassignedShards) {
this.unassignedShards = numUnassignedShards;
}

public void incTotalAllocations() {
++this.totalAllocations;
}

public void incUndesiredAllocationsExcludingShuttingDownNodes() {
++this.undesiredAllocationsExcludingShuttingDownNodes;
}

public long getTotalAllocations() {
return this.totalAllocations;
}

public long getUndesiredAllocationsExcludingShuttingDownNodes() {
return this.undesiredAllocationsExcludingShuttingDownNodes;
}

public void setNoRebalancing() {
this.rebalancing = false;
}

}
};
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,21 @@
* in this sequence.
* @param routingAllocation a copy of (the immutable parts of) the context for the allocation decision process
* @param ignoredShards a list of the shards for which earlier allocators have claimed responsibility
* @param clusterAllocationStatsBuilder Tracks the statistics for this balancing round
*/
public record DesiredBalanceInput(long index, RoutingAllocation routingAllocation, List<ShardRouting> ignoredShards) {
public record DesiredBalanceInput(
long index,
RoutingAllocation routingAllocation,
List<ShardRouting> ignoredShards,
BalancingRoundStats.Builder clusterAllocationStatsBuilder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like it's a bit confusing how there's a stats builder on the input, but we only use it for the eager reconciliation (assuming I've understood the code right). Could we instead just create one specifically for the eager reconciliation to make it clear it's local to that process?

Copy link
Contributor Author

@DiannaHohensee DiannaHohensee Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand. What part are you referring to as eager reconciliation? Is it this balance() method?

My intention with passing around a stats data structure is to collect additional statistics throughout the balancing round -- for example, I'll want balancing round start and end times, so I'll need stats tracking as early as this line before computation begins. A balancing round begins with a DesiredBalanceInput submitted to the ContinuousComputation object, which queues requests for a balancing round: the latest DesiredBalanceInput submission will be run async.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean in allocate() we call reconcile(...) with desiredBalanceInput.clusterAllocationStatsBuilder() but then in the continuous computation we create separate StatsBuilders to pass to submitReconcileTask, even though desiredBalanceInput is in scope.
I just wonder whether the statsBuilder belongs in desiredBalanceInput? but I find the code a bit hard to follow so maybe there's an obvious reason for it being there.
I think I'm struggling to understand the lifecycle of the builder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh. I think you found something buggy. So allocate will queue a DesiredBalanceInput for computation, which gets picked up asynchronously. But then reconcile() is run immediately and updates the metrics. Computation happens eventually.

I thought I was being thorough by passing the stats builder everywhere, but reconciliation always runs whereas computation occasionally runs (queues, so an input will get skipped if there's a new input). And then computation will normally lead to reconciliation being called.

Okay... So allocate() needs two separate StatsBuilders, one for immediate reconciliation, and the other passed to computation but ultimately reconciliation. Or alternatively we skip instrumenting computation and create fresh StatsBuilders for each reconciliation event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there will be some use of the StatsBuilder in the computation phase (is that what // TODO: this will be expanded upon shortly in ES-10341. refers to?)

If so, is is as simple as just

  • Removing StatsBuilder from the DesiredBalanceInput
  • Creating one locally to pass to reconcile(...) in allocate(...)
  • Using the existing local StatsBuilder in processInput

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I've updated this patch to be purely refactor work. It doesn't lead into new work anymore. I think it's cleaner without pushing desired metrics objects into the Reconciler to handle: this cleans up the constructor and purpose of the Reconciler.

) {

public static DesiredBalanceInput create(long index, RoutingAllocation routingAllocation) {
return new DesiredBalanceInput(
index,
routingAllocation.immutableClone(),
List.copyOf(routingAllocation.routingNodes().unassigned().ignored())
List.copyOf(routingAllocation.routingNodes().unassigned().ignored()),
new BalancingRoundStats.Builder()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,8 @@
*/
public class DesiredBalanceMetrics {

public record AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {}

public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {}

public static final DesiredBalanceMetrics NOOP = new DesiredBalanceMetrics(MeterRegistry.NOOP);

public static final String UNASSIGNED_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.unassigned.current";
public static final String TOTAL_SHARDS_METRIC_NAME = "es.allocator.desired_balance.shards.current";
public static final String UNDESIRED_ALLOCATION_COUNT_METRIC_NAME = "es.allocator.desired_balance.allocations.undesired.current";
Expand All @@ -56,8 +52,6 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w
public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME =
"es.allocator.allocations.node.forecasted_disk_usage_bytes.current";

public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, -1, -1);

private volatile boolean nodeIsMaster = false;
/**
* Number of unassigned shards during last reconciliation
Expand All @@ -80,16 +74,16 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w
);

public void updateMetrics(
AllocationStats allocationStats,
BalancingRoundStats balancingRoundStats,
Map<DiscoveryNode, NodeWeightStats> weightStatsPerNode,
Map<DiscoveryNode, NodeAllocationStatsAndWeight> nodeAllocationStats
) {
assert allocationStats != null : "allocation stats cannot be null";
assert balancingRoundStats != null : "allocation stats cannot be null";
assert weightStatsPerNode != null : "node balance weight stats cannot be null";
if (allocationStats != EMPTY_ALLOCATION_STATS) {
this.unassignedShards = allocationStats.unassignedShards;
this.totalAllocations = allocationStats.totalAllocations;
this.undesiredAllocations = allocationStats.undesiredAllocationsExcludingShuttingDownNodes;
if (balancingRoundStats.rebalancing()) {
this.unassignedShards = balancingRoundStats.unassignedShards();
this.totalAllocations = balancingRoundStats.totalAllocations();
this.undesiredAllocations = balancingRoundStats.undesiredAllocationsExcludingShuttingDownNodes();
}
weightStatsPerNodeRef.set(weightStatsPerNode);
allocationStatsPerNodeRef.set(nodeAllocationStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics.AllocationStats;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
Expand All @@ -36,9 +33,7 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -82,16 +77,8 @@ public class DesiredBalanceReconciler {
private double undesiredAllocationsLogThreshold;
private final NodeAllocationOrdering allocationOrdering = new NodeAllocationOrdering();
private final NodeAllocationOrdering moveOrdering = new NodeAllocationOrdering();
private final DesiredBalanceMetrics desiredBalanceMetrics;
private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator;

public DesiredBalanceReconciler(
ClusterSettings clusterSettings,
ThreadPool threadPool,
DesiredBalanceMetrics desiredBalanceMetrics,
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
) {
this.desiredBalanceMetrics = desiredBalanceMetrics;

public DesiredBalanceReconciler(ClusterSettings clusterSettings, ThreadPool threadPool) {
this.undesiredAllocationLogInterval = new FrequencyCappedAction(
threadPool.relativeTimeInMillisSupplier(),
TimeValue.timeValueMinutes(5)
Expand All @@ -101,7 +88,6 @@ public DesiredBalanceReconciler(
UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
value -> this.undesiredAllocationsLogThreshold = value
);
this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator;
}

/**
Expand All @@ -110,12 +96,13 @@ public DesiredBalanceReconciler(
* @param desiredBalance The new desired cluster shard allocation
* @param allocation Cluster state information with which to make decisions, contains routing table metadata that will be modified to
* reach the given desired balance.
* @param statsBuilder Stats tracker for balancing round reconciliation changes.
*/
public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) {
public void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation, BalancingRoundStats.Builder statsBuilder) {
var nodeIds = allocation.routingNodes().getAllNodeIds();
allocationOrdering.retainNodes(nodeIds);
moveOrdering.retainNodes(nodeIds);
new Reconciliation(desiredBalance, allocation).run();
new Reconciliation(desiredBalance, allocation).run(statsBuilder);
}

public void clear() {
Expand All @@ -135,7 +122,7 @@ private class Reconciliation {
this.routingNodes = allocation.routingNodes();
}

void run() {
void run(BalancingRoundStats.Builder statsBuilder) {
try (var ignored = allocation.withReconcilingFlag()) {

logger.debug("Reconciling desired balance for [{}]", desiredBalance.lastConvergedIndex());
Expand Down Expand Up @@ -165,31 +152,10 @@ void run() {
moveShards();
// 3. move any other shards that are desired elsewhere
logger.trace("Reconciler#balance");
var allocationStats = balance();
balance(statsBuilder);

logger.debug("Reconciliation is complete");

updateDesireBalanceMetrics(allocationStats);
}
}

private void updateDesireBalanceMetrics(AllocationStats allocationStats) {
var nodesStatsAndWeights = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights(
allocation.metadata(),
allocation.routingNodes(),
allocation.clusterInfo(),
desiredBalance
);
Map<DiscoveryNode, NodeAllocationStatsAndWeight> filteredNodeAllocationStatsAndWeights = new HashMap<>(
nodesStatsAndWeights.size()
);
for (var nodeStatsAndWeight : nodesStatsAndWeights.entrySet()) {
var node = allocation.nodes().get(nodeStatsAndWeight.getKey());
if (node != null) {
filteredNodeAllocationStatsAndWeights.put(node, nodeStatsAndWeight.getValue());
}
}
desiredBalanceMetrics.updateMetrics(allocationStats, desiredBalance.weightsPerNode(), filteredNodeAllocationStatsAndWeights);
}

private boolean allocateUnassignedInvariant() {
Expand Down Expand Up @@ -512,22 +478,21 @@ private void moveShards() {
}
}

private AllocationStats balance() {
private void balance(BalancingRoundStats.Builder statsBuilder) {
if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) {
return DesiredBalanceMetrics.EMPTY_ALLOCATION_STATS;
statsBuilder.setNoRebalancing();
return;
}

int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size();
int totalAllocations = 0;
int undesiredAllocationsExcludingShuttingDownNodes = 0;
statsBuilder.setUnassignedShards(routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size());

// Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard
// movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the
// shards.
for (final var iterator = OrderedShardsIterator.createForBalancing(allocation, moveOrdering); iterator.hasNext();) {
final var shardRouting = iterator.next();

totalAllocations++;
statsBuilder.incTotalAllocations();

if (shardRouting.started() == false) {
// can only rebalance started shards
Expand All @@ -546,7 +511,7 @@ private AllocationStats balance() {
}

if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) {
undesiredAllocationsExcludingShuttingDownNodes++;
statsBuilder.incUndesiredAllocationsExcludingShuttingDownNodes();
}

if (allocation.deciders().canRebalance(shardRouting, allocation).type() != Decision.Type.YES) {
Expand Down Expand Up @@ -580,12 +545,15 @@ private AllocationStats balance() {
}
}

maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size());
maybeLogUndesiredAllocationsWarning(
statsBuilder.getTotalAllocations(),
statsBuilder.getUndesiredAllocationsExcludingShuttingDownNodes(),
routingNodes.size()
);

return new AllocationStats(unassignedShards, totalAllocations, undesiredAllocationsExcludingShuttingDownNodes);
}

private void maybeLogUndesiredAllocationsWarning(int totalAllocations, int undesiredAllocations, int nodeCount) {
private void maybeLogUndesiredAllocationsWarning(long totalAllocations, long undesiredAllocations, int nodeCount) {
// more shards than cluster can relocate with one reroute
final boolean nonEmptyRelocationBacklog = undesiredAllocations > 2L * nodeCount;
final boolean warningThresholdReached = undesiredAllocations > undesiredAllocationsLogThreshold * totalAllocations;
Expand Down
Loading