@@ -27,10 +27,10 @@ public record BalancingRoundSummary(Map<String, NodesWeightsChanges> nodeNameToW
2727 * Represents the change in weights for a node going from an old DesiredBalance to a new DesiredBalance
2828 * Saves the node weights of an old DesiredBalance, along with a diff against a newer DesiredBalance.
2929 *
30- * @param weights The starting {@link DesiredBalanceMetrics.NodeWeightStats} of a previous DesiredBalance.
31- * @param nextWeightsDiff The difference between a previous DesiredBalance and a new DesiredBalance.
30+ * @param baseWeights The starting {@link DesiredBalanceMetrics.NodeWeightStats} of a previous DesiredBalance.
31+ * @param weightsDiff The difference between the {@code baseWeights} and a new DesiredBalance.
3232 */
33- record NodesWeightsChanges (DesiredBalanceMetrics .NodeWeightStats weights , NodeWeightsDiff nextWeightsDiff ) {}
33+ record NodesWeightsChanges (DesiredBalanceMetrics .NodeWeightStats baseWeights , NodeWeightsDiff weightsDiff ) {}
3434
3535 /**
3636 * Represents the change of shard balance weights for a node, comparing an older DesiredBalance with the latest DesiredBalance.
@@ -41,6 +41,15 @@ record NodesWeightsChanges(DesiredBalanceMetrics.NodeWeightStats weights, NodeWe
4141 * @param totalWeightDiff How much more, or less, the total weight is of shards assigned to the node in the latest DesiredBalance.
4242 */
4343 record NodeWeightsDiff (long shardCountDiff , double diskUsageInBytesDiff , double writeLoadDiff , double totalWeightDiff ) {
44+
45+ /**
46+ * Creates a diff where the {@code base} weights will be subtracted from the {@code next} weights, to show the changes made to reach
47+ * the {@code next} weights.
48+ *
49+ * @param base has the original weights
50+ * @param next has the new weights
51+ * @return The diff of ({@code next} - {@code base})
52+ */
4453 public static NodeWeightsDiff create (DesiredBalanceMetrics .NodeWeightStats base , DesiredBalanceMetrics .NodeWeightStats next ) {
4554 return new NodeWeightsDiff (
4655 next .shardCount () - base .shardCount (),
@@ -49,6 +58,18 @@ public static NodeWeightsDiff create(DesiredBalanceMetrics.NodeWeightStats base,
4958 next .nodeWeight () - base .nodeWeight ()
5059 );
5160 }
61+
62+ /**
63+ * Creates a new {@link NodeWeightsDiff} summing this instance's values with {@code otherDiff}'s values.
64+ */
65+ public NodeWeightsDiff combine (NodeWeightsDiff otherDiff ) {
66+ return new NodeWeightsDiff (
67+ this .shardCountDiff + otherDiff .shardCountDiff ,
68+ this .diskUsageInBytesDiff + otherDiff .diskUsageInBytesDiff ,
69+ this .writeLoadDiff + otherDiff .writeLoadDiff ,
70+ this .totalWeightDiff + otherDiff .totalWeightDiff
71+ );
72+ }
5273 }
5374
5475 @ Override
@@ -83,45 +104,44 @@ public record CombinedBalancingRoundSummary(
83104
84105 public static final CombinedBalancingRoundSummary EMPTY_RESULTS = new CombinedBalancingRoundSummary (0 , new HashMap <>(), 0 );
85106
107+ /**
108+ * Merges multiple {@link BalancingRoundSummary} summaries into a single {@link CombinedBalancingRoundSummary}.
109+ */
86110 public static CombinedBalancingRoundSummary combine (List <BalancingRoundSummary > summaries ) {
87111 if (summaries .isEmpty ()) {
88112 return EMPTY_RESULTS ;
89113 }
90114
91- // Initialize the combined weight changes with the oldest changes. We can then build the combined changes by adding the diffs of
92- // newer weight changes. If a new node gets added in a later summary, then we will initialize its weights starting there.
93- // Similarly, a node may be removed in a later summary: in this case we will keep that nodes work, up until it was removed.
94- var iterator = summaries .iterator ();
95- assert iterator .hasNext ();
96- var firstSummary = iterator .next ();
97- Map <String , NodesWeightsChanges > combinedNodeNameToWeightChanges = new HashMap <>(firstSummary .nodeNameToWeightChanges );
115+ // We will loop through the summaries and sum the weight diffs for each node entry.
116+ Map <String , NodesWeightsChanges > combinedNodeNameToWeightChanges = new HashMap <>();
98117
99118 // Number of shards moves are simply summed across summaries. Each new balancing round is built upon the last one, so it is
100119 // possible that a shard is reassigned back to a node before it even moves away, and that will still be counted as 2 moves here.
101- long numberOfShardMoves = firstSummary . numberOfShardsToMove ;
120+ long numberOfShardMoves = 0 ;
102121
103- // Initialize with 1 because we've already begun to iterate the summaries .
104- int numSummaries = 1 ;
122+ // Total number of summaries that are being combined .
123+ int numSummaries = 0 ;
105124
106- // Iterate any remaining summaries (after the first one).
125+ var iterator = summaries . iterator ();
107126 while (iterator .hasNext ()) {
108127 var summary = iterator .next ();
128+
129+ // We'll build the weight changes by keeping the node weight base from the first summary in which a node appears and then
130+ // summing the weight diffs in each summary to get total weight diffs across summaries.
109131 for (var nodeNameAndWeights : summary .nodeNameToWeightChanges .entrySet ()) {
110132 var combined = combinedNodeNameToWeightChanges .get (nodeNameAndWeights .getKey ());
111133 if (combined == null ) {
112- // Encountered a new node in a later summary. Add the new node initializing it with the base weights from that
113- // summary.
134+ // Either this is the first summary, and combinedNodeNameToWeightChanges hasn't been initialized yet for this node;
135+ // or a later balancing round had a new node. Either way, initialize the node entry with the weight changes from the
136+ // first summary in which it appears.
114137 combinedNodeNameToWeightChanges .put (nodeNameAndWeights .getKey (), nodeNameAndWeights .getValue ());
115138 } else {
116- var newCombinedDiff = new NodeWeightsDiff (
117- combined .nextWeightsDiff .shardCountDiff + nodeNameAndWeights .getValue ().nextWeightsDiff .shardCountDiff ,
118- combined .nextWeightsDiff .diskUsageInBytesDiff + nodeNameAndWeights
119- .getValue ().nextWeightsDiff .diskUsageInBytesDiff ,
120- combined .nextWeightsDiff .writeLoadDiff + nodeNameAndWeights .getValue ().nextWeightsDiff .writeLoadDiff ,
121- combined .nextWeightsDiff .totalWeightDiff + nodeNameAndWeights .getValue ().nextWeightsDiff .totalWeightDiff
139+ // We have at least two summaries containing this node, so let's combine them.
140+ var newCombinedChanges = new NodesWeightsChanges (
141+ combined .baseWeights ,
142+ combined .weightsDiff .combine (nodeNameAndWeights .getValue ().weightsDiff ())
122143 );
123- var newCombinedChanges = new NodesWeightsChanges (combined .weights , newCombinedDiff );
124- combinedNodeNameToWeightChanges .compute (nodeNameAndWeights .getKey (), (k , weightChanges ) -> newCombinedChanges );
144+ combinedNodeNameToWeightChanges .put (nodeNameAndWeights .getKey (), newCombinedChanges );
125145 }
126146 }
127147
0 commit comments