Skip to content

Commit b1e6908

Browse files
Add node weight changes to balance round summaries (#122195)
The node weight changes between two balancer rounds are summarized by saving the old DesiredBalance's weights per node along with a weights diff to reach the new DesiredBalance's weights per node. This supports combining multiple summaries by using the oldest summary's base node weights and summing the diffs across all summaries to reach a combined node weight diffs.
1 parent feb3a60 commit b1e6908

File tree

8 files changed

+545
-79
lines changed

8 files changed

+545
-79
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import org.elasticsearch.threadpool.ThreadPool;
1919

2020
import java.util.ArrayList;
21+
import java.util.HashMap;
22+
import java.util.Map;
2123
import java.util.concurrent.ConcurrentLinkedQueue;
2224
import java.util.concurrent.atomic.AtomicReference;
2325

@@ -83,6 +85,74 @@ public AllocationBalancingRoundSummaryService(ThreadPool threadPool, ClusterSett
8385
});
8486
}
8587

88+
/**
89+
* Summarizes the work required to move from an old to new desired balance shard allocation.
90+
*/
91+
public static BalancingRoundSummary createBalancerRoundSummary(DesiredBalance oldDesiredBalance, DesiredBalance newDesiredBalance) {
92+
return new BalancingRoundSummary(
93+
createWeightsSummary(oldDesiredBalance, newDesiredBalance),
94+
DesiredBalance.shardMovements(oldDesiredBalance, newDesiredBalance)
95+
);
96+
}
97+
98+
/**
99+
* Creates a summary of the node weight changes from {@code oldDesiredBalance} to {@code newDesiredBalance}.
100+
* See {@link BalancingRoundSummary.NodesWeightsChanges} for content details.
101+
*/
102+
private static Map<String, BalancingRoundSummary.NodesWeightsChanges> createWeightsSummary(
103+
DesiredBalance oldDesiredBalance,
104+
DesiredBalance newDesiredBalance
105+
) {
106+
var oldWeightsPerNode = oldDesiredBalance.weightsPerNode();
107+
var newWeightsPerNode = newDesiredBalance.weightsPerNode();
108+
109+
Map<String, BalancingRoundSummary.NodesWeightsChanges> nodeNameToWeightInfo = new HashMap<>(oldWeightsPerNode.size());
110+
for (var nodeAndWeights : oldWeightsPerNode.entrySet()) {
111+
var discoveryNode = nodeAndWeights.getKey();
112+
var oldNodeWeightStats = nodeAndWeights.getValue();
113+
114+
// The node may no longer exists in the new DesiredBalance. If so, the new weights for that node are effectively zero. New
115+
// weights of zero will result in correctly negative weight diffs for the removed node.
116+
var newNodeWeightStats = newWeightsPerNode.getOrDefault(discoveryNode, DesiredBalanceMetrics.NodeWeightStats.ZERO);
117+
118+
nodeNameToWeightInfo.put(
119+
discoveryNode.getName(),
120+
new BalancingRoundSummary.NodesWeightsChanges(
121+
oldNodeWeightStats,
122+
BalancingRoundSummary.NodeWeightsDiff.create(oldNodeWeightStats, newNodeWeightStats)
123+
)
124+
);
125+
}
126+
127+
// There may be a new node in the new DesiredBalance that was not in the old DesiredBalance. So we'll need to iterate the nodes in
128+
// the new DesiredBalance to check.
129+
for (var nodeAndWeights : newWeightsPerNode.entrySet()) {
130+
var discoveryNode = nodeAndWeights.getKey();
131+
if (nodeNameToWeightInfo.containsKey(discoveryNode.getName()) == false) {
132+
// This node is new in the new DesiredBalance, there was no entry added during iteration of the nodes in the old
133+
// DesiredBalance. So we'll make a new entry with a base of zero value weights and a weights diff of the new node's weights.
134+
nodeNameToWeightInfo.put(
135+
discoveryNode.getName(),
136+
new BalancingRoundSummary.NodesWeightsChanges(
137+
DesiredBalanceMetrics.NodeWeightStats.ZERO,
138+
BalancingRoundSummary.NodeWeightsDiff.create(DesiredBalanceMetrics.NodeWeightStats.ZERO, nodeAndWeights.getValue())
139+
)
140+
);
141+
}
142+
}
143+
144+
return nodeNameToWeightInfo;
145+
}
146+
147+
/**
148+
* Creates and saves a balancer round summary for the work to move from {@code oldDesiredBalance} to {@code newDesiredBalance}. If
149+
* balancer round summaries are not enabled in the cluster (see {@link #ENABLE_BALANCER_ROUND_SUMMARIES_SETTING}), then the summary is
150+
* immediately discarded.
151+
*/
152+
public void addBalancerRoundSummary(DesiredBalance oldDesiredBalance, DesiredBalance newDesiredBalance) {
153+
addBalancerRoundSummary(createBalancerRoundSummary(oldDesiredBalance, newDesiredBalance));
154+
}
155+
86156
/**
87157
* Adds the summary of a balancing round. If summaries are enabled, this will eventually be reported (logging, etc.). If balancer round
88158
* summaries are not enabled in the cluster, then the summary is immediately discarded (so as not to fill up a data structure that will
@@ -110,7 +180,7 @@ private void reportSummariesAndThenReschedule() {
110180
*/
111181
private void drainAndReportSummaries() {
112182
var combinedSummaries = drainSummaries();
113-
if (combinedSummaries == CombinedBalancingRoundSummary.EMPTY_RESULTS) {
183+
if (combinedSummaries == BalancingRoundSummary.CombinedBalancingRoundSummary.EMPTY_RESULTS) {
114184
return;
115185
}
116186

@@ -120,14 +190,15 @@ private void drainAndReportSummaries() {
120190
/**
121191
* Returns a combined summary of all unreported allocation round summaries: may summarize a single balancer round, multiple, or none.
122192
*
123-
* @return {@link CombinedBalancingRoundSummary#EMPTY_RESULTS} if there are no balancing round summaries waiting to be reported.
193+
* @return {@link BalancingRoundSummary.CombinedBalancingRoundSummary#EMPTY_RESULTS} if there are no balancing round summaries waiting
194+
* to be reported.
124195
*/
125-
private CombinedBalancingRoundSummary drainSummaries() {
196+
private BalancingRoundSummary.CombinedBalancingRoundSummary drainSummaries() {
126197
ArrayList<BalancingRoundSummary> batchOfSummaries = new ArrayList<>();
127198
while (summaries.isEmpty() == false) {
128199
batchOfSummaries.add(summaries.poll());
129200
}
130-
return CombinedBalancingRoundSummary.combine(batchOfSummaries);
201+
return BalancingRoundSummary.CombinedBalancingRoundSummary.combine(batchOfSummaries);
131202
}
132203

133204
/**
@@ -186,7 +257,9 @@ private void rescheduleReporting() {
186257
}
187258
}
188259

189-
// @VisibleForTesting
260+
/**
261+
* Checks that the number of entries in {@link #summaries} matches the given {@code numberOfSummaries}.
262+
*/
190263
protected void verifyNumberOfSummaries(int numberOfSummaries) {
191264
assert numberOfSummaries == summaries.size();
192265
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ public void allocate(RoutingAllocation allocation) {
167167
balancer.moveShards();
168168
balancer.balance();
169169

170+
// Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
170171
collectAndRecordNodeWeightStats(balancer, weightFunction, allocation);
171172
}
172173

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundSummary.java

Lines changed: 136 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,149 @@
99

1010
package org.elasticsearch.cluster.routing.allocation.allocator;
1111

12+
import java.util.HashMap;
13+
import java.util.List;
14+
import java.util.Map;
15+
1216
/**
1317
* Summarizes the impact to the cluster as a result of a rebalancing round.
1418
*
15-
* @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one.
19+
* @param nodeNameToWeightChanges The shard balance weight changes for each node (by name), comparing a previous DesiredBalance shard
20+
* allocation to a new DesiredBalance allocation.
21+
* @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one. Does not include
22+
* new (index creation) or removed (index deletion) shard assignements.
1623
*/
17-
public record BalancingRoundSummary(long numberOfShardsToMove) {
24+
public record BalancingRoundSummary(Map<String, NodesWeightsChanges> nodeNameToWeightChanges, long numberOfShardsToMove) {
25+
26+
/**
27+
* Represents the change in weights for a node going from an old DesiredBalance to a new DesiredBalance
28+
* Saves the node weights of an old DesiredBalance, along with a diff against a newer DesiredBalance.
29+
*
30+
* @param baseWeights The starting {@link DesiredBalanceMetrics.NodeWeightStats} of a previous DesiredBalance.
31+
* @param weightsDiff The difference between the {@code baseWeights} and a new DesiredBalance.
32+
*/
33+
record NodesWeightsChanges(DesiredBalanceMetrics.NodeWeightStats baseWeights, NodeWeightsDiff weightsDiff) {}
34+
35+
/**
36+
* Represents the change of shard balance weights for a node, comparing an older DesiredBalance with the latest DesiredBalance.
37+
*
38+
* @param shardCountDiff How many more, or less, shards are assigned to the node in the latest DesiredBalance.
39+
* @param diskUsageInBytesDiff How much more, or less, disk is used by shards assigned to the node in the latest DesiredBalance.
40+
* @param writeLoadDiff How much more, or less, write load is estimated for shards assigned to the node in the latest DesiredBalance.
41+
* @param totalWeightDiff How much more, or less, the total weight is of shards assigned to the node in the latest DesiredBalance.
42+
*/
43+
record NodeWeightsDiff(long shardCountDiff, double diskUsageInBytesDiff, double writeLoadDiff, double totalWeightDiff) {
44+
45+
/**
46+
* Creates a diff where the {@code base} weights will be subtracted from the {@code next} weights, to show the changes made to reach
47+
* the {@code next} weights.
48+
*
49+
* @param base has the original weights
50+
* @param next has the new weights
51+
* @return The diff of ({@code next} - {@code base})
52+
*/
53+
public static NodeWeightsDiff create(DesiredBalanceMetrics.NodeWeightStats base, DesiredBalanceMetrics.NodeWeightStats next) {
54+
return new NodeWeightsDiff(
55+
next.shardCount() - base.shardCount(),
56+
next.diskUsageInBytes() - base.diskUsageInBytes(),
57+
next.writeLoad() - base.writeLoad(),
58+
next.nodeWeight() - base.nodeWeight()
59+
);
60+
}
61+
62+
/**
63+
* Creates a new {@link NodeWeightsDiff} summing this instance's values with {@code otherDiff}'s values.
64+
*/
65+
public NodeWeightsDiff combine(NodeWeightsDiff otherDiff) {
66+
return new NodeWeightsDiff(
67+
this.shardCountDiff + otherDiff.shardCountDiff,
68+
this.diskUsageInBytesDiff + otherDiff.diskUsageInBytesDiff,
69+
this.writeLoadDiff + otherDiff.writeLoadDiff,
70+
this.totalWeightDiff + otherDiff.totalWeightDiff
71+
);
72+
}
73+
}
1874

1975
@Override
2076
public String toString() {
21-
return "BalancingRoundSummary{" + "numberOfShardsToMove=" + numberOfShardsToMove + '}';
77+
return "BalancingRoundSummary{"
78+
+ "nodeNameToWeightChanges"
79+
+ nodeNameToWeightChanges
80+
+ ", numberOfShardsToMove="
81+
+ numberOfShardsToMove
82+
+ '}';
83+
}
84+
85+
/**
86+
* Holds combined {@link BalancingRoundSummary} results. Essentially holds a list of the balancing events and the summed up changes
87+
* across all those events: what allocation work was done across some period of time.
88+
* TODO: WIP ES-10341
89+
*
90+
* Note that each balancing round summary is the difference between, at the time, latest desired balance and the previous desired
91+
* balance. Each summary represents a step towards the next desired balance, which is based on presuming the previous desired balance is
92+
* reached. So combining them is roughly the difference between the first summary's previous desired balance and the last summary's
93+
* latest desired balance.
94+
*
95+
* @param numberOfBalancingRounds How many balancing round summaries are combined in this report.
96+
* @param nodeNameToWeightChanges
97+
* @param numberOfShardMoves The sum of shard moves for each balancing round being combined into a single summary.
98+
*/
99+
public record CombinedBalancingRoundSummary(
100+
int numberOfBalancingRounds,
101+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges,
102+
long numberOfShardMoves
103+
) {
104+
105+
public static final CombinedBalancingRoundSummary EMPTY_RESULTS = new CombinedBalancingRoundSummary(0, new HashMap<>(), 0);
106+
107+
/**
108+
* Merges multiple {@link BalancingRoundSummary} summaries into a single {@link CombinedBalancingRoundSummary}.
109+
*/
110+
public static CombinedBalancingRoundSummary combine(List<BalancingRoundSummary> summaries) {
111+
if (summaries.isEmpty()) {
112+
return EMPTY_RESULTS;
113+
}
114+
115+
// We will loop through the summaries and sum the weight diffs for each node entry.
116+
Map<String, NodesWeightsChanges> combinedNodeNameToWeightChanges = new HashMap<>();
117+
118+
// Number of shards moves are simply summed across summaries. Each new balancing round is built upon the last one, so it is
119+
// possible that a shard is reassigned back to a node before it even moves away, and that will still be counted as 2 moves here.
120+
long numberOfShardMoves = 0;
121+
122+
// Total number of summaries that are being combined.
123+
int numSummaries = 0;
124+
125+
var iterator = summaries.iterator();
126+
while (iterator.hasNext()) {
127+
var summary = iterator.next();
128+
129+
// We'll build the weight changes by keeping the node weight base from the first summary in which a node appears and then
130+
// summing the weight diffs in each summary to get total weight diffs across summaries.
131+
for (var nodeNameAndWeights : summary.nodeNameToWeightChanges.entrySet()) {
132+
var combined = combinedNodeNameToWeightChanges.get(nodeNameAndWeights.getKey());
133+
if (combined == null) {
134+
// Either this is the first summary, and combinedNodeNameToWeightChanges hasn't been initialized yet for this node;
135+
// or a later balancing round had a new node. Either way, initialize the node entry with the weight changes from the
136+
// first summary in which it appears.
137+
combinedNodeNameToWeightChanges.put(nodeNameAndWeights.getKey(), nodeNameAndWeights.getValue());
138+
} else {
139+
// We have at least two summaries containing this node, so let's combine them.
140+
var newCombinedChanges = new NodesWeightsChanges(
141+
combined.baseWeights,
142+
combined.weightsDiff.combine(nodeNameAndWeights.getValue().weightsDiff())
143+
);
144+
combinedNodeNameToWeightChanges.put(nodeNameAndWeights.getKey(), newCombinedChanges);
145+
}
146+
}
147+
148+
++numSummaries;
149+
numberOfShardMoves += summary.numberOfShardsToMove();
150+
}
151+
152+
return new CombinedBalancingRoundSummary(numSummaries, combinedNodeNameToWeightChanges, numberOfShardMoves);
153+
}
154+
22155
}
23156

24157
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/CombinedBalancingRoundSummary.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ public class DesiredBalanceMetrics {
3838
*/
3939
public record AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {}
4040

41-
public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {}
41+
public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {
42+
public static final NodeWeightStats ZERO = new NodeWeightStats(0, 0, 0, 0);
43+
}
4244

4345
// Reconciliation metrics.
4446
/** See {@link #unassignedShards} */

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
324324
}
325325

326326
if (currentDesiredBalanceRef.compareAndSet(oldDesiredBalance, newDesiredBalance)) {
327-
balancerRoundSummaryService.addBalancerRoundSummary(calculateBalancingRoundSummary(oldDesiredBalance, newDesiredBalance));
327+
balancerRoundSummaryService.addBalancerRoundSummary(oldDesiredBalance, newDesiredBalance);
328328
if (logger.isTraceEnabled()) {
329329
var diff = DesiredBalance.hasChanges(oldDesiredBalance, newDesiredBalance)
330330
? "Diff: " + DesiredBalance.humanReadableDiff(oldDesiredBalance, newDesiredBalance)
@@ -339,13 +339,6 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
339339
}
340340
}
341341

342-
/**
343-
* Summarizes the work required to move from an old to new desired balance shard allocation.
344-
*/
345-
private BalancingRoundSummary calculateBalancingRoundSummary(DesiredBalance oldDesiredBalance, DesiredBalance newDesiredBalance) {
346-
return new BalancingRoundSummary(DesiredBalance.shardMovements(oldDesiredBalance, newDesiredBalance));
347-
}
348-
349342
/**
350343
* Submits the desired balance to be reconciled (applies the desired changes to the routing table) and creates and publishes a new
351344
* cluster state. The data nodes will receive and apply the new cluster state to start/move/remove shards.

0 commit comments

Comments
 (0)