Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -83,10 +85,90 @@ public AllocationBalancingRoundSummaryService(ThreadPool threadPool, ClusterSett
});
}

/**
* Summarizes the work required to move from an old to new desired balance shard allocation.
*/
public static BalancingRoundSummary createBalancerRoundSummary(DesiredBalance oldDesiredBalance, DesiredBalance newDesiredBalance) {
return new BalancingRoundSummary(
createWeightsSummary(oldDesiredBalance, newDesiredBalance),
DesiredBalance.shardMovements(oldDesiredBalance, newDesiredBalance)
);
}

/**
* Creates a summary of the node weight changes from {@code oldDesiredBalance} to {@code newDesiredBalance}.
* See {@link BalancingRoundSummary.NodesWeightsChanges} for content details.
*
* @VisibleForTesting
*/
protected static Map<String, BalancingRoundSummary.NodesWeightsChanges> createWeightsSummary(
DesiredBalance oldDesiredBalance,
DesiredBalance newDesiredBalance
) {
var oldWeightsPerNode = oldDesiredBalance.weightsPerNode();
var newWeightsPerNode = newDesiredBalance.weightsPerNode();

Map<String, BalancingRoundSummary.NodesWeightsChanges> nodeNameToWeightInfo = new HashMap<>(oldWeightsPerNode.size());
for (var nodeAndWeights : oldWeightsPerNode.entrySet()) {
var discoveryNode = nodeAndWeights.getKey();
var oldNodeWeightStats = nodeAndWeights.getValue();
var newNodeWeightStats = newWeightsPerNode.get(discoveryNode);

if (newNodeWeightStats == null) {
// The node no longer exists in the new DesiredBalance. The weight diffs will be equal to the negative of the old
// DesiredBalance weights to reflect a new DesiredBalance of zero for each weight.
nodeNameToWeightInfo.put(
discoveryNode.getName(),
new BalancingRoundSummary.NodesWeightsChanges(
oldNodeWeightStats,
new BalancingRoundSummary.NodeWeightsDiff(
-oldNodeWeightStats.shardCount(),
-oldNodeWeightStats.diskUsageInBytes(),
-oldNodeWeightStats.writeLoad(),
-oldNodeWeightStats.nodeWeight()
)
)
);
continue;
}

nodeNameToWeightInfo.put(
discoveryNode.getName(),
new BalancingRoundSummary.NodesWeightsChanges(
oldNodeWeightStats,
BalancingRoundSummary.NodeWeightsDiff.create(oldNodeWeightStats, newNodeWeightStats)
)
);
}

// There may be a new node in the new DesiredBalance that was not in the old DesiredBalance. So we'll need to iterate the nodes in
// the new DesiredBalance to check.
for (var nodeAndWeights : newWeightsPerNode.entrySet()) {
var discoveryNode = nodeAndWeights.getKey();
if (nodeNameToWeightInfo.containsKey(discoveryNode.getName()) == false) {
// This node is new in the new DesiredBalance, there is no entry in the result yet because we previously iterated the nodes
// in the old DesiredBalance. So we'll make a new entry with a base of zero value weights and a weights diff of the new
// node's weights.
var zeroBaseWeights = new DesiredBalanceMetrics.NodeWeightStats(0, 0, 0, 0);
nodeNameToWeightInfo.put(
discoveryNode.getName(),
new BalancingRoundSummary.NodesWeightsChanges(
zeroBaseWeights,
BalancingRoundSummary.NodeWeightsDiff.create(zeroBaseWeights, nodeAndWeights.getValue())
)
);
}
}

return nodeNameToWeightInfo;
}

/**
* Adds the summary of a balancing round. If summaries are enabled, this will eventually be reported (logging, etc.). If balancer round
* summaries are not enabled in the cluster, then the summary is immediately discarded (so as not to fill up a data structure that will
* never be drained).
*
* @VisibleForTesting
*/
public void addBalancerRoundSummary(BalancingRoundSummary summary) {
if (enableBalancerRoundSummaries == false) {
Expand All @@ -110,7 +192,7 @@ private void reportSummariesAndThenReschedule() {
*/
private void drainAndReportSummaries() {
var combinedSummaries = drainSummaries();
if (combinedSummaries == CombinedBalancingRoundSummary.EMPTY_RESULTS) {
if (combinedSummaries == BalancingRoundSummary.CombinedBalancingRoundSummary.EMPTY_RESULTS) {
return;
}

Expand All @@ -120,14 +202,14 @@ private void drainAndReportSummaries() {
/**
* Returns a combined summary of all unreported allocation round summaries: may summarize a single balancer round, multiple, or none.
*
* @return {@link CombinedBalancingRoundSummary#EMPTY_RESULTS} if there are no balancing round summaries waiting to be reported.
* @return {@link BalancingRoundSummary.CombinedBalancingRoundSummary#EMPTY_RESULTS} if there are no balancing round summaries waiting to be reported.
*/
private CombinedBalancingRoundSummary drainSummaries() {
private BalancingRoundSummary.CombinedBalancingRoundSummary drainSummaries() {
ArrayList<BalancingRoundSummary> batchOfSummaries = new ArrayList<>();
while (summaries.isEmpty() == false) {
batchOfSummaries.add(summaries.poll());
}
return CombinedBalancingRoundSummary.combine(batchOfSummaries);
return BalancingRoundSummary.CombinedBalancingRoundSummary.combine(batchOfSummaries);
}

/**
Expand Down Expand Up @@ -186,7 +268,11 @@ private void rescheduleReporting() {
}
}

// @VisibleForTesting
/**
* Checks that the number of entries in {@link #summaries} matches the given {@code numberOfSummaries}.
*
* @VisibleForTesting
*/
protected void verifyNumberOfSummaries(int numberOfSummaries) {
assert numberOfSummaries == summaries.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public void allocate(RoutingAllocation allocation) {
balancer.moveShards();
balancer.balance();

// Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
collectAndRecordNodeWeightStats(balancer, weightFunction, allocation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,129 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Summarizes the impact to the cluster as a result of a rebalancing round.
*
* @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one.
* @param nodeNameToWeightChanges The shard balance weight changes for each node (by name), comparing a previous DesiredBalance shard
* allocation to a new DesiredBalance allocation.
* @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one. Does not include
* new (index creation) or removed (index deletion) shard assignements.
*/
public record BalancingRoundSummary(long numberOfShardsToMove) {
public record BalancingRoundSummary(Map<String, NodesWeightsChanges> nodeNameToWeightChanges, long numberOfShardsToMove) {

/**
* Represents the change in weights for a node going from an old DesiredBalance to a new DesiredBalance
* Saves the node weights of an old DesiredBalance, along with a diff against a newer DesiredBalance.
*
* @param weights The starting {@link DesiredBalanceMetrics.NodeWeightStats} of a previous DesiredBalance.
* @param nextWeightsDiff The difference between a previous DesiredBalance and a new DesiredBalance.
*/
record NodesWeightsChanges(DesiredBalanceMetrics.NodeWeightStats weights, NodeWeightsDiff nextWeightsDiff) {}

/**
* Represents the change of shard balance weights for a node, comparing an older DesiredBalance with the latest DesiredBalance.
*
* @param shardCountDiff How many more, or less, shards are assigned to the node in the latest DesiredBalance.
* @param diskUsageInBytesDiff How much more, or less, disk is used by shards assigned to the node in the latest DesiredBalance.
* @param writeLoadDiff How much more, or less, write load is estimated for shards assigned to the node in the latest DesiredBalance.
* @param totalWeightDiff How much more, or less, the total weight is of shards assigned to the node in the latest DesiredBalance.
*/
record NodeWeightsDiff(long shardCountDiff, double diskUsageInBytesDiff, double writeLoadDiff, double totalWeightDiff) {
public static NodeWeightsDiff create(DesiredBalanceMetrics.NodeWeightStats base, DesiredBalanceMetrics.NodeWeightStats next) {
return new NodeWeightsDiff(
next.shardCount() - base.shardCount(),
next.diskUsageInBytes() - base.diskUsageInBytes(),
next.writeLoad() - base.writeLoad(),
next.nodeWeight() - base.nodeWeight()
);
}
}

@Override
public String toString() {
return "BalancingRoundSummary{" + "numberOfShardsToMove=" + numberOfShardsToMove + '}';
return "BalancingRoundSummary{"
+ "nodeNameToWeightChanges"
+ nodeNameToWeightChanges
+ ", numberOfShardsToMove="
+ numberOfShardsToMove
+ '}';
}

/**
* Holds combined {@link BalancingRoundSummary} results. Essentially holds a list of the balancing events and the summed up changes
* across all those events: what allocation work was done across some period of time.
* TODO: WIP ES-10341
*
* Note that each balancing round summary is the difference between, at the time, latest desired balance and the previous desired balance.
* Each summary represents a step towards the next desired balance, which is based on presuming the previous desired balance is reached. So
* combining them is roughly the difference between the first summary's previous desired balance and the last summary's latest desired
* balance.
*
* @param numberOfBalancingRounds How many balancing round summaries are combined in this report.
* @param nodeNameToWeightChanges
* @param numberOfShardMoves The sum of shard moves for each balancing round being combined into a single summary.
*/
public record CombinedBalancingRoundSummary(
int numberOfBalancingRounds,
Map<String, NodesWeightsChanges> nodeNameToWeightChanges,
long numberOfShardMoves
) {

public static final CombinedBalancingRoundSummary EMPTY_RESULTS = new CombinedBalancingRoundSummary(0, new HashMap<>(), 0);

public static CombinedBalancingRoundSummary combine(List<BalancingRoundSummary> summaries) {
if (summaries.isEmpty()) {
return EMPTY_RESULTS;
}

// Initialize the combined weight changes with the oldest changes. We can then build the combined changes by adding the diffs of
// newer weight changes. If a new node gets added in a later summary, then we will initialize its weights starting there.
// Similarly, a node may be removed in a later summary: in this case we will keep that nodes work, up until it was removed.
var iterator = summaries.iterator();
assert iterator.hasNext();
var firstSummary = iterator.next();
Map<String, NodesWeightsChanges> combinedNodeNameToWeightChanges = new HashMap<>(firstSummary.nodeNameToWeightChanges);

// Number of shards moves are simply summed across summaries. Each new balancing round is built upon the last one, so it is
// possible that a shard is reassigned back to a node before it even moves away, and that will still be counted as 2 moves here.
long numberOfShardMoves = firstSummary.numberOfShardsToMove;

// Initialize with 1 because we've already begun to iterate the summaries.
int numSummaries = 1;

// Iterate any remaining summaries (after the first one).
while (iterator.hasNext()) {
var summary = iterator.next();
for (var nodeNameAndWeights : summary.nodeNameToWeightChanges.entrySet()) {
var combined = combinedNodeNameToWeightChanges.get(nodeNameAndWeights.getKey());
if (combined == null) {
// Encountered a new node in a later summary. Add the new node initializing it with the base weights from that
// summary.
combinedNodeNameToWeightChanges.put(nodeNameAndWeights.getKey(), nodeNameAndWeights.getValue());
} else {
var newCombinedDiff = new NodeWeightsDiff(
combined.nextWeightsDiff.shardCountDiff + nodeNameAndWeights.getValue().nextWeightsDiff.shardCountDiff,
combined.nextWeightsDiff.diskUsageInBytesDiff + nodeNameAndWeights
.getValue().nextWeightsDiff.diskUsageInBytesDiff,
combined.nextWeightsDiff.writeLoadDiff + nodeNameAndWeights.getValue().nextWeightsDiff.writeLoadDiff,
combined.nextWeightsDiff.totalWeightDiff + nodeNameAndWeights.getValue().nextWeightsDiff.totalWeightDiff
);
var newCombinedChanges = new NodesWeightsChanges(combined.weights, newCombinedDiff);
combinedNodeNameToWeightChanges.compute(nodeNameAndWeights.getKey(), (k, weightChanges) -> newCombinedChanges);
}
}

++numSummaries;
numberOfShardMoves += summary.numberOfShardsToMove();
}

return new CombinedBalancingRoundSummary(numSummaries, combinedNodeNameToWeightChanges, numberOfShardMoves);
}

}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,9 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
}

if (currentDesiredBalanceRef.compareAndSet(oldDesiredBalance, newDesiredBalance)) {
balancerRoundSummaryService.addBalancerRoundSummary(calculateBalancingRoundSummary(oldDesiredBalance, newDesiredBalance));
balancerRoundSummaryService.addBalancerRoundSummary(
AllocationBalancingRoundSummaryService.createBalancerRoundSummary(oldDesiredBalance, newDesiredBalance)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I wonder if it's worth just changing the signature (and perhaps the name to reflect the change) of addBalancerRoundSummary to just take the old and new desired balances? then reduces some of the coupling between DesiredBalanceShardsAllocator and the AllocationBalancingRoundSummaryService?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could do that and keep the testability (it could just call a static method on itself that could be exposed for testing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand your suggestion. If I change the method to

public void addBalancerRoundSummary(DesiredBalance oldDesired, DesiredBalance newDesired)

Then for testing addBalancerRoundSummary I'll have to create DesiredBalance instances and reason about the BalancingRoundSummary that results. Right now it's simpler to test addBalancerRoundSummary.

The createBalancerRoundSummary method could still be tested the same way even if it was internal (exposed for testing, as you say). But addBalancerRoundSummary seems more complicated, unless I'm missing something 🤔

Copy link
Contributor

@nicktindall nicktindall Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I just meant maybe it's easier, for the caller, to provide an overload e.g.

class AllocationBalancerRoundSummary {

   public void addBalancerRoundSummary(DesiredBalance old, DesiredBalance new) {
      addBalancerRoundSummary(createBalancerRoundSummary(old, new));
   }

   public void addBalancerRoundSummary(BalancingRoundSummary summary) {
      // ...
   }

   public static BalancingRoundSummary createBalancingRoundSummary(DesiredBalance old, DesiredBalance new) { 
      // ...
   }
}

That way DesiredBalanceShardAllocator doesn't need to know about createBalancingRoundSummary?

The overload being just a convenience that I'd argue doesn't need to be explicitly tested itself because it's just delegating to things that are tested.

But no strong feelings about this one. Up to you :)

Copy link
Contributor Author

@DiannaHohensee DiannaHohensee Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, thanks for explaining. Sure, done in 3df5904

if (logger.isTraceEnabled()) {
var diff = DesiredBalance.hasChanges(oldDesiredBalance, newDesiredBalance)
? "Diff: " + DesiredBalance.humanReadableDiff(oldDesiredBalance, newDesiredBalance)
Expand All @@ -340,13 +342,6 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
}
}

/**
* Summarizes the work required to move from an old to new desired balance shard allocation.
*/
private BalancingRoundSummary calculateBalancingRoundSummary(DesiredBalance oldDesiredBalance, DesiredBalance newDesiredBalance) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved this logic into the AllocationBalancingRoundSummaryService to make it more unit testable. The summaries are also going to become more complex as I add more summary stats/metrics: that logic seems more appropriate in the summary service.

return new BalancingRoundSummary(DesiredBalance.shardMovements(oldDesiredBalance, newDesiredBalance));
}

protected void submitReconcileTask(DesiredBalance desiredBalance) {
masterServiceTaskQueue.submitTask("reconcile-desired-balance", new ReconcileDesiredBalanceTask(desiredBalance), null);
}
Expand Down
Loading