Skip to content

Commit 633879e

Browse files
Add node weight changes to balance round summaries
1 parent 2bf9874 commit 633879e

File tree

7 files changed

+534
-78
lines changed

7 files changed

+534
-78
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import org.elasticsearch.threadpool.ThreadPool;
1919

2020
import java.util.ArrayList;
21+
import java.util.HashMap;
22+
import java.util.Map;
2123
import java.util.concurrent.ConcurrentLinkedQueue;
2224
import java.util.concurrent.atomic.AtomicReference;
2325

@@ -83,10 +85,90 @@ public AllocationBalancingRoundSummaryService(ThreadPool threadPool, ClusterSett
8385
});
8486
}
8587

88+
/**
89+
* Summarizes the work required to move from an old to new desired balance shard allocation.
90+
*/
91+
public static BalancingRoundSummary createBalancerRoundSummary(DesiredBalance oldDesiredBalance, DesiredBalance newDesiredBalance) {
92+
return new BalancingRoundSummary(
93+
createWeightsSummary(oldDesiredBalance, newDesiredBalance),
94+
DesiredBalance.shardMovements(oldDesiredBalance, newDesiredBalance)
95+
);
96+
}
97+
98+
/**
99+
* Creates a summary of the node weight changes from {@code oldDesiredBalance} to {@code newDesiredBalance}.
100+
* See {@link BalancingRoundSummary.NodesWeightsChanges} for content details.
101+
*
102+
* @VisibleForTesting
103+
*/
104+
protected static Map<String, BalancingRoundSummary.NodesWeightsChanges> createWeightsSummary(
105+
DesiredBalance oldDesiredBalance,
106+
DesiredBalance newDesiredBalance
107+
) {
108+
var oldWeightsPerNode = oldDesiredBalance.weightsPerNode();
109+
var newWeightsPerNode = newDesiredBalance.weightsPerNode();
110+
111+
Map<String, BalancingRoundSummary.NodesWeightsChanges> nodeNameToWeightInfo = new HashMap<>(oldWeightsPerNode.size());
112+
for (var nodeAndWeights : oldWeightsPerNode.entrySet()) {
113+
var discoveryNode = nodeAndWeights.getKey();
114+
var oldNodeWeightStats = nodeAndWeights.getValue();
115+
var newNodeWeightStats = newWeightsPerNode.get(discoveryNode);
116+
117+
if (newNodeWeightStats == null) {
118+
// The node no longer exists in the new DesiredBalance. The weight diffs will be equal to the negative of the old
119+
// DesiredBalance weights to reflect a new DesiredBalance of zero for each weight.
120+
nodeNameToWeightInfo.put(
121+
discoveryNode.getName(),
122+
new BalancingRoundSummary.NodesWeightsChanges(
123+
oldNodeWeightStats,
124+
new BalancingRoundSummary.NodeWeightsDiff(
125+
-oldNodeWeightStats.shardCount(),
126+
-oldNodeWeightStats.diskUsageInBytes(),
127+
-oldNodeWeightStats.writeLoad(),
128+
-oldNodeWeightStats.nodeWeight()
129+
)
130+
)
131+
);
132+
continue;
133+
}
134+
135+
nodeNameToWeightInfo.put(
136+
discoveryNode.getName(),
137+
new BalancingRoundSummary.NodesWeightsChanges(
138+
oldNodeWeightStats,
139+
BalancingRoundSummary.NodeWeightsDiff.create(oldNodeWeightStats, newNodeWeightStats)
140+
)
141+
);
142+
}
143+
144+
// There may be a new node in the new DesiredBalance that was not in the old DesiredBalance. So we'll need to iterate the nodes in
145+
// the new DesiredBalance to check.
146+
for (var nodeAndWeights : newWeightsPerNode.entrySet()) {
147+
var discoveryNode = nodeAndWeights.getKey();
148+
if (nodeNameToWeightInfo.containsKey(discoveryNode.getName()) == false) {
149+
// This node is new in the new DesiredBalance, there is no entry in the result yet because we previously iterated the nodes
150+
// in the old DesiredBalance. So we'll make a new entry with a base of zero value weights and a weights diff of the new
151+
// node's weights.
152+
var zeroBaseWeights = new DesiredBalanceMetrics.NodeWeightStats(0, 0, 0, 0);
153+
nodeNameToWeightInfo.put(
154+
discoveryNode.getName(),
155+
new BalancingRoundSummary.NodesWeightsChanges(
156+
zeroBaseWeights,
157+
BalancingRoundSummary.NodeWeightsDiff.create(zeroBaseWeights, nodeAndWeights.getValue())
158+
)
159+
);
160+
}
161+
}
162+
163+
return nodeNameToWeightInfo;
164+
}
165+
86166
/**
87167
* Adds the summary of a balancing round. If summaries are enabled, this will eventually be reported (logging, etc.). If balancer round
88168
* summaries are not enabled in the cluster, then the summary is immediately discarded (so as not to fill up a data structure that will
89169
* never be drained).
170+
*
171+
* @VisibleForTesting
90172
*/
91173
public void addBalancerRoundSummary(BalancingRoundSummary summary) {
92174
if (enableBalancerRoundSummaries == false) {
@@ -110,7 +192,7 @@ private void reportSummariesAndThenReschedule() {
110192
*/
111193
private void drainAndReportSummaries() {
112194
var combinedSummaries = drainSummaries();
113-
if (combinedSummaries == CombinedBalancingRoundSummary.EMPTY_RESULTS) {
195+
if (combinedSummaries == BalancingRoundSummary.CombinedBalancingRoundSummary.EMPTY_RESULTS) {
114196
return;
115197
}
116198

@@ -120,14 +202,14 @@ private void drainAndReportSummaries() {
120202
/**
121203
* Returns a combined summary of all unreported allocation round summaries: may summarize a single balancer round, multiple, or none.
122204
*
123-
* @return {@link CombinedBalancingRoundSummary#EMPTY_RESULTS} if there are no balancing round summaries waiting to be reported.
205+
* @return {@link BalancingRoundSummary.CombinedBalancingRoundSummary#EMPTY_RESULTS} if there are no balancing round summaries waiting to be reported.
124206
*/
125-
private CombinedBalancingRoundSummary drainSummaries() {
207+
private BalancingRoundSummary.CombinedBalancingRoundSummary drainSummaries() {
126208
ArrayList<BalancingRoundSummary> batchOfSummaries = new ArrayList<>();
127209
while (summaries.isEmpty() == false) {
128210
batchOfSummaries.add(summaries.poll());
129211
}
130-
return CombinedBalancingRoundSummary.combine(batchOfSummaries);
212+
return BalancingRoundSummary.CombinedBalancingRoundSummary.combine(batchOfSummaries);
131213
}
132214

133215
/**
@@ -186,7 +268,11 @@ private void rescheduleReporting() {
186268
}
187269
}
188270

189-
// @VisibleForTesting
271+
/**
272+
* Checks that the number of entries in {@link #summaries} matches the given {@code numberOfSummaries}.
273+
*
274+
* @VisibleForTesting
275+
*/
190276
protected void verifyNumberOfSummaries(int numberOfSummaries) {
191277
assert numberOfSummaries == summaries.size();
192278
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ public void allocate(RoutingAllocation allocation) {
162162
balancer.moveShards();
163163
balancer.balance();
164164

165+
// Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
165166
collectAndRecordNodeWeightStats(balancer, weightFunction, allocation);
166167
}
167168

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancingRoundSummary.java

Lines changed: 116 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,129 @@
99

1010
package org.elasticsearch.cluster.routing.allocation.allocator;
1111

12+
import java.util.HashMap;
13+
import java.util.List;
14+
import java.util.Map;
15+
1216
/**
1317
* Summarizes the impact to the cluster as a result of a rebalancing round.
1418
*
15-
* @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one.
19+
* @param nodeNameToWeightChanges The shard balance weight changes for each node (by name), comparing a previous DesiredBalance shard
20+
* allocation to a new DesiredBalance allocation.
21+
* @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one. Does not include
22+
* new (index creation) or removed (index deletion) shard assignements.
1623
*/
17-
public record BalancingRoundSummary(long numberOfShardsToMove) {
24+
public record BalancingRoundSummary(Map<String, NodesWeightsChanges> nodeNameToWeightChanges, long numberOfShardsToMove) {
25+
26+
/**
27+
* Represents the change in weights for a node going from an old DesiredBalance to a new DesiredBalance
28+
* Saves the node weights of an old DesiredBalance, along with a diff against a newer DesiredBalance.
29+
*
30+
* @param weights The starting {@link DesiredBalanceMetrics.NodeWeightStats} of a previous DesiredBalance.
31+
* @param nextWeightsDiff The difference between a previous DesiredBalance and a new DesiredBalance.
32+
*/
33+
record NodesWeightsChanges(DesiredBalanceMetrics.NodeWeightStats weights, NodeWeightsDiff nextWeightsDiff) {}
34+
35+
/**
36+
* Represents the change of shard balance weights for a node, comparing an older DesiredBalance with the latest DesiredBalance.
37+
*
38+
* @param shardCountDiff How many more, or less, shards are assigned to the node in the latest DesiredBalance.
39+
* @param diskUsageInBytesDiff How much more, or less, disk is used by shards assigned to the node in the latest DesiredBalance.
40+
* @param writeLoadDiff How much more, or less, write load is estimated for shards assigned to the node in the latest DesiredBalance.
41+
* @param totalWeightDiff How much more, or less, the total weight is of shards assigned to the node in the latest DesiredBalance.
42+
*/
43+
record NodeWeightsDiff(long shardCountDiff, double diskUsageInBytesDiff, double writeLoadDiff, double totalWeightDiff) {
44+
public static NodeWeightsDiff create(DesiredBalanceMetrics.NodeWeightStats base, DesiredBalanceMetrics.NodeWeightStats next) {
45+
return new NodeWeightsDiff(
46+
next.shardCount() - base.shardCount(),
47+
next.diskUsageInBytes() - base.diskUsageInBytes(),
48+
next.writeLoad() - base.writeLoad(),
49+
next.nodeWeight() - base.nodeWeight()
50+
);
51+
}
52+
}
1853

1954
@Override
2055
public String toString() {
21-
return "BalancingRoundSummary{" + "numberOfShardsToMove=" + numberOfShardsToMove + '}';
56+
return "BalancingRoundSummary{"
57+
+ "nodeNameToWeightChanges"
58+
+ nodeNameToWeightChanges
59+
+ ", numberOfShardsToMove="
60+
+ numberOfShardsToMove
61+
+ '}';
62+
}
63+
64+
/**
65+
* Holds combined {@link BalancingRoundSummary} results. Essentially holds a list of the balancing events and the summed up changes
66+
* across all those events: what allocation work was done across some period of time.
67+
* TODO: WIP ES-10341
68+
*
69+
* Note that each balancing round summary is the difference between, at the time, latest desired balance and the previous desired balance.
70+
* Each summary represents a step towards the next desired balance, which is based on presuming the previous desired balance is reached. So
71+
* combining them is roughly the difference between the first summary's previous desired balance and the last summary's latest desired
72+
* balance.
73+
*
74+
* @param numberOfBalancingRounds How many balancing round summaries are combined in this report.
75+
* @param nodeNameToWeightChanges
76+
* @param numberOfShardMoves The sum of shard moves for each balancing round being combined into a single summary.
77+
*/
78+
public record CombinedBalancingRoundSummary(
79+
int numberOfBalancingRounds,
80+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges,
81+
long numberOfShardMoves
82+
) {
83+
84+
public static final CombinedBalancingRoundSummary EMPTY_RESULTS = new CombinedBalancingRoundSummary(0, new HashMap<>(), 0);
85+
86+
public static CombinedBalancingRoundSummary combine(List<BalancingRoundSummary> summaries) {
87+
if (summaries.isEmpty()) {
88+
return EMPTY_RESULTS;
89+
}
90+
91+
// Initialize the combined weight changes with the oldest changes. We can then build the combined changes by adding the diffs of
92+
// newer weight changes. If a new node gets added in a later summary, then we will initialize its weights starting there.
93+
// Similarly, a node may be removed in a later summary: in this case we will keep that nodes work, up until it was removed.
94+
var iterator = summaries.iterator();
95+
assert iterator.hasNext();
96+
var firstSummary = iterator.next();
97+
Map<String, NodesWeightsChanges> combinedNodeNameToWeightChanges = new HashMap<>(firstSummary.nodeNameToWeightChanges);
98+
99+
// Number of shards moves are simply summed across summaries. Each new balancing round is built upon the last one, so it is
100+
// possible that a shard is reassigned back to a node before it even moves away, and that will still be counted as 2 moves here.
101+
long numberOfShardMoves = firstSummary.numberOfShardsToMove;
102+
103+
// Initialize with 1 because we've already begun to iterate the summaries.
104+
int numSummaries = 1;
105+
106+
// Iterate any remaining summaries (after the first one).
107+
while (iterator.hasNext()) {
108+
var summary = iterator.next();
109+
for (var nodeNameAndWeights : summary.nodeNameToWeightChanges.entrySet()) {
110+
var combined = combinedNodeNameToWeightChanges.get(nodeNameAndWeights.getKey());
111+
if (combined == null) {
112+
// Encountered a new node in a later summary. Add the new node initializing it with the base weights from that
113+
// summary.
114+
combinedNodeNameToWeightChanges.put(nodeNameAndWeights.getKey(), nodeNameAndWeights.getValue());
115+
} else {
116+
var newCombinedDiff = new NodeWeightsDiff(
117+
combined.nextWeightsDiff.shardCountDiff + nodeNameAndWeights.getValue().nextWeightsDiff.shardCountDiff,
118+
combined.nextWeightsDiff.diskUsageInBytesDiff + nodeNameAndWeights
119+
.getValue().nextWeightsDiff.diskUsageInBytesDiff,
120+
combined.nextWeightsDiff.writeLoadDiff + nodeNameAndWeights.getValue().nextWeightsDiff.writeLoadDiff,
121+
combined.nextWeightsDiff.totalWeightDiff + nodeNameAndWeights.getValue().nextWeightsDiff.totalWeightDiff
122+
);
123+
var newCombinedChanges = new NodesWeightsChanges(combined.weights, newCombinedDiff);
124+
combinedNodeNameToWeightChanges.compute(nodeNameAndWeights.getKey(), (k, weightChanges) -> newCombinedChanges);
125+
}
126+
}
127+
128+
++numSummaries;
129+
numberOfShardMoves += summary.numberOfShardsToMove();
130+
}
131+
132+
return new CombinedBalancingRoundSummary(numSummaries, combinedNodeNameToWeightChanges, numberOfShardMoves);
133+
}
134+
22135
}
23136

24137
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/CombinedBalancingRoundSummary.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,9 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
325325
}
326326

327327
if (currentDesiredBalanceRef.compareAndSet(oldDesiredBalance, newDesiredBalance)) {
328-
balancerRoundSummaryService.addBalancerRoundSummary(calculateBalancingRoundSummary(oldDesiredBalance, newDesiredBalance));
328+
balancerRoundSummaryService.addBalancerRoundSummary(
329+
AllocationBalancingRoundSummaryService.createBalancerRoundSummary(oldDesiredBalance, newDesiredBalance)
330+
);
329331
if (logger.isTraceEnabled()) {
330332
var diff = DesiredBalance.hasChanges(oldDesiredBalance, newDesiredBalance)
331333
? "Diff: " + DesiredBalance.humanReadableDiff(oldDesiredBalance, newDesiredBalance)
@@ -340,13 +342,6 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
340342
}
341343
}
342344

343-
/**
344-
* Summarizes the work required to move from an old to new desired balance shard allocation.
345-
*/
346-
private BalancingRoundSummary calculateBalancingRoundSummary(DesiredBalance oldDesiredBalance, DesiredBalance newDesiredBalance) {
347-
return new BalancingRoundSummary(DesiredBalance.shardMovements(oldDesiredBalance, newDesiredBalance));
348-
}
349-
350345
protected void submitReconcileTask(DesiredBalance desiredBalance) {
351346
masterServiceTaskQueue.submitTask("reconcile-desired-balance", new ReconcileDesiredBalanceTask(desiredBalance), null);
352347
}

0 commit comments

Comments
 (0)