Skip to content

Commit 16c7ec6

Browse files
Add comments and refactor method names in balancer stats code (#119613)
This change is adding comments and renaming classes/methods to improve understandability. Relates ES-10341
1 parent d37e1bd commit 16c7ec6

File tree

12 files changed

+130
-77
lines changed

12 files changed

+130
-77
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.elasticsearch.cluster.routing.allocation.AllocationService.RerouteStrategy;
3434
import org.elasticsearch.cluster.routing.allocation.AllocationStatsService;
3535
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
36-
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider;
36+
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator;
3737
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
3838
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
3939
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
@@ -139,7 +139,10 @@ public ClusterModule(
139139
this.clusterPlugins = clusterPlugins;
140140
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
141141
this.allocationDeciders = new AllocationDeciders(deciderList);
142-
var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster, clusterService.getClusterSettings());
142+
var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator(
143+
writeLoadForecaster,
144+
clusterService.getClusterSettings()
145+
);
143146
this.shardsAllocator = createShardsAllocator(
144147
settings,
145148
clusterService.getClusterSettings(),
@@ -149,7 +152,7 @@ public ClusterModule(
149152
this::reconcile,
150153
writeLoadForecaster,
151154
telemetryProvider,
152-
nodeAllocationStatsProvider
155+
nodeAllocationStatsAndWeightsCalculator
153156
);
154157
this.clusterService = clusterService;
155158
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices);
@@ -167,7 +170,7 @@ public ClusterModule(
167170
clusterService,
168171
clusterInfoService,
169172
shardsAllocator,
170-
nodeAllocationStatsProvider
173+
nodeAllocationStatsAndWeightsCalculator
171174
);
172175
this.telemetryProvider = telemetryProvider;
173176
}
@@ -409,7 +412,7 @@ private static ShardsAllocator createShardsAllocator(
409412
DesiredBalanceReconcilerAction reconciler,
410413
WriteLoadForecaster writeLoadForecaster,
411414
TelemetryProvider telemetryProvider,
412-
NodeAllocationStatsProvider nodeAllocationStatsProvider
415+
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
413416
) {
414417
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
415418
allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(clusterSettings, writeLoadForecaster));
@@ -422,7 +425,7 @@ private static ShardsAllocator createShardsAllocator(
422425
clusterService,
423426
reconciler,
424427
telemetryProvider,
425-
nodeAllocationStatsProvider
428+
nodeAllocationStatsAndWeightsCalculator
426429
)
427430
);
428431

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,35 +19,41 @@
1919
import java.util.function.Supplier;
2020
import java.util.stream.Collectors;
2121

22+
/**
23+
* Exposes cluster allocation metrics. Constructs {@link NodeAllocationStats} per node, on demand.
24+
*/
2225
public class AllocationStatsService {
2326
private final ClusterService clusterService;
2427
private final ClusterInfoService clusterInfoService;
2528
private final Supplier<DesiredBalance> desiredBalanceSupplier;
26-
private final NodeAllocationStatsProvider nodeAllocationStatsProvider;
29+
private final NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator;
2730

2831
public AllocationStatsService(
2932
ClusterService clusterService,
3033
ClusterInfoService clusterInfoService,
3134
ShardsAllocator shardsAllocator,
32-
NodeAllocationStatsProvider nodeAllocationStatsProvider
35+
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
3336
) {
3437
this.clusterService = clusterService;
3538
this.clusterInfoService = clusterInfoService;
36-
this.nodeAllocationStatsProvider = nodeAllocationStatsProvider;
39+
this.nodeAllocationStatsAndWeightsCalculator = nodeAllocationStatsAndWeightsCalculator;
3740
this.desiredBalanceSupplier = shardsAllocator instanceof DesiredBalanceShardsAllocator allocator
3841
? allocator::getDesiredBalance
3942
: () -> null;
4043
}
4144

45+
/**
46+
* Returns a map of node IDs to node allocation stats.
47+
*/
4248
public Map<String, NodeAllocationStats> stats() {
43-
var state = clusterService.state();
44-
var stats = nodeAllocationStatsProvider.stats(
45-
state.metadata(),
46-
state.getRoutingNodes(),
49+
var clusterState = clusterService.state();
50+
var nodesStatsAndWeights = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights(
51+
clusterState.metadata(),
52+
clusterState.getRoutingNodes(),
4753
clusterInfoService.getClusterInfo(),
4854
desiredBalanceSupplier.get()
4955
);
50-
return stats.entrySet()
56+
return nodesStatsAndWeights.entrySet()
5157
.stream()
5258
.collect(
5359
Collectors.toMap(

server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStats.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@
1818

1919
import java.io.IOException;
2020

21+
/**
22+
* Point-in-time allocation stats for a particular node.
23+
*
24+
* @param shards count of shards on this node.
25+
* @param undesiredShards count of shards that we want to move off of this node.
26+
* @param forecastedIngestLoad the predicted near future total ingest load on this node.
27+
* @param forecastedDiskUsage the predicted near future total disk usage on this node.
28+
* @param currentDiskUsage the current total disk usage on this node.
29+
*/
2130
public record NodeAllocationStats(
2231
int shards,
2332
int undesiredShards,

server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java renamed to server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,21 @@
2424

2525
import java.util.Map;
2626

27-
public class NodeAllocationStatsProvider {
27+
/**
28+
* Calculates the allocation weights and usage stats for each node: see {@link NodeAllocationStatsAndWeight} for details.
29+
*/
30+
public class NodeAllocationStatsAndWeightsCalculator {
2831
private final WriteLoadForecaster writeLoadForecaster;
2932

3033
private volatile float indexBalanceFactor;
3134
private volatile float shardBalanceFactor;
3235
private volatile float writeLoadBalanceFactor;
3336
private volatile float diskUsageBalanceFactor;
3437

35-
public record NodeAllocationAndClusterBalanceStats(
38+
/**
39+
* Node shard allocation stats and the total node weight.
40+
*/
41+
public record NodeAllocationStatsAndWeight(
3642
int shards,
3743
int undesiredShards,
3844
double forecastedIngestLoad,
@@ -41,7 +47,7 @@ public record NodeAllocationAndClusterBalanceStats(
4147
float currentNodeWeight
4248
) {}
4349

44-
public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster, ClusterSettings clusterSettings) {
50+
public NodeAllocationStatsAndWeightsCalculator(WriteLoadForecaster writeLoadForecaster, ClusterSettings clusterSettings) {
4551
this.writeLoadForecaster = writeLoadForecaster;
4652
clusterSettings.initializeAndWatch(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value);
4753
clusterSettings.initializeAndWatch(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value);
@@ -55,7 +61,10 @@ public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster, Clus
5561
);
5662
}
5763

58-
public Map<String, NodeAllocationAndClusterBalanceStats> stats(
64+
/**
65+
* Returns a map of node IDs to {@link NodeAllocationStatsAndWeight}.
66+
*/
67+
public Map<String, NodeAllocationStatsAndWeight> nodesAllocationStatsAndWeights(
5968
Metadata metadata,
6069
RoutingNodes routingNodes,
6170
ClusterInfo clusterInfo,
@@ -66,7 +75,7 @@ public Map<String, NodeAllocationAndClusterBalanceStats> stats(
6675
var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
6776
var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes);
6877

69-
var stats = Maps.<String, NodeAllocationAndClusterBalanceStats>newMapWithExpectedSize(routingNodes.size());
78+
var nodeAllocationStatsAndWeights = Maps.<String, NodeAllocationStatsAndWeight>newMapWithExpectedSize(routingNodes.size());
7079
for (RoutingNode node : routingNodes) {
7180
int shards = 0;
7281
int undesiredShards = 0;
@@ -75,9 +84,10 @@ public Map<String, NodeAllocationAndClusterBalanceStats> stats(
7584
long currentDiskUsage = 0;
7685
for (ShardRouting shardRouting : node) {
7786
if (shardRouting.relocating()) {
87+
// Skip the shard if it is moving off this node. The node running recovery will count it.
7888
continue;
7989
}
80-
shards++;
90+
++shards;
8191
IndexMetadata indexMetadata = metadata.getIndexSafe(shardRouting.index());
8292
if (isDesiredAllocation(desiredBalance, shardRouting) == false) {
8393
undesiredShards++;
@@ -86,20 +96,21 @@ public Map<String, NodeAllocationAndClusterBalanceStats> stats(
8696
forecastedWriteLoad += writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
8797
forecastedDiskUsage += Math.max(indexMetadata.getForecastedShardSizeInBytes().orElse(0), shardSize);
8898
currentDiskUsage += shardSize;
89-
9099
}
91-
float currentNodeWeight = weightFunction.nodeWeight(
100+
float currentNodeWeight = weightFunction.calculateNodeWeight(
92101
shards,
93102
avgShardsPerNode,
94103
forecastedWriteLoad,
95104
avgWriteLoadPerNode,
96105
currentDiskUsage,
97106
avgDiskUsageInBytesPerNode
98107
);
99-
stats.put(
108+
nodeAllocationStatsAndWeights.put(
100109
node.nodeId(),
101-
new NodeAllocationAndClusterBalanceStats(
110+
new NodeAllocationStatsAndWeight(
102111
shards,
112+
// It's part of a public API contract for an 'undesired_shards' field that -1 will be returned if an allocator other
113+
// than the desired balance allocator is used.
103114
desiredBalance != null ? undesiredShards : -1,
104115
forecastedWriteLoad,
105116
forecastedDiskUsage,
@@ -109,10 +120,13 @@ public Map<String, NodeAllocationAndClusterBalanceStats> stats(
109120
);
110121
}
111122

112-
return stats;
123+
return nodeAllocationStatsAndWeights;
113124
}
114125

115-
private static boolean isDesiredAllocation(DesiredBalance desiredBalance, ShardRouting shardRouting) {
126+
/**
127+
* Checks whether a shard is currently allocated to a node that is wanted by the desired balance decision.
128+
*/
129+
private static boolean isDesiredAllocation(@Nullable DesiredBalance desiredBalance, ShardRouting shardRouting) {
116130
if (desiredBalance == null) {
117131
return true;
118132
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
112112
Property.NodeScope
113113
);
114114

115+
// TODO: deduplicate these fields, use the fields in NodeAllocationStatsAndWeightsCalculator instead.
115116
private volatile float indexBalanceFactor;
116117
private volatile float shardBalanceFactor;
117118
private volatile float writeLoadBalanceFactor;
@@ -168,7 +169,7 @@ private void collectAndRecordNodeWeightStats(Balancer balancer, WeightFunction w
168169
Map<DiscoveryNode, DesiredBalanceMetrics.NodeWeightStats> nodeLevelWeights = new HashMap<>();
169170
for (var entry : balancer.nodes.entrySet()) {
170171
var node = entry.getValue();
171-
var nodeWeight = weightFunction.nodeWeight(
172+
var nodeWeight = weightFunction.calculateNodeWeight(
172173
node.numShards(),
173174
balancer.avgShardsPerNode(),
174175
node.writeLoad(),
@@ -263,7 +264,7 @@ public static class Balancer {
263264
private final RoutingAllocation allocation;
264265
private final RoutingNodes routingNodes;
265266
private final Metadata metadata;
266-
private final WeightFunction weight;
267+
private final WeightFunction weightFunction;
267268

268269
private final float threshold;
269270
private final float avgShardsPerNode;
@@ -272,12 +273,17 @@ public static class Balancer {
272273
private final Map<String, ModelNode> nodes;
273274
private final NodeSorter sorter;
274275

275-
private Balancer(WriteLoadForecaster writeLoadForecaster, RoutingAllocation allocation, WeightFunction weight, float threshold) {
276+
private Balancer(
277+
WriteLoadForecaster writeLoadForecaster,
278+
RoutingAllocation allocation,
279+
WeightFunction weightFunction,
280+
float threshold
281+
) {
276282
this.writeLoadForecaster = writeLoadForecaster;
277283
this.allocation = allocation;
278284
this.routingNodes = allocation.routingNodes();
279285
this.metadata = allocation.metadata();
280-
this.weight = weight;
286+
this.weightFunction = weightFunction;
281287
this.threshold = threshold;
282288
avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
283289
avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
@@ -355,7 +361,7 @@ public double avgDiskUsageInBytesPerNode() {
355361
* to sort based on an index.
356362
*/
357363
private NodeSorter newNodeSorter() {
358-
return new NodeSorter(nodesArray(), weight, this);
364+
return new NodeSorter(nodesArray(), weightFunction, this);
359365
}
360366

361367
/**
@@ -435,7 +441,7 @@ private MoveDecision decideRebalance(final ShardRouting shard, Decision canRemai
435441

436442
// balance the shard, if a better node can be found
437443
final String idxName = shard.getIndexName();
438-
final float currentWeight = weight.weight(this, currentNode, idxName);
444+
final float currentWeight = weightFunction.calculateNodeWeightWithIndex(this, currentNode, idxName);
439445
final AllocationDeciders deciders = allocation.deciders();
440446
Type rebalanceDecisionType = Type.NO;
441447
ModelNode targetNode = null;
@@ -451,7 +457,7 @@ private MoveDecision decideRebalance(final ShardRouting shard, Decision canRemai
451457
// this is a comparison of the number of shards on this node to the number of shards
452458
// that should be on each node on average (both taking the cluster as a whole into account
453459
// as well as shards per index)
454-
final float nodeWeight = weight.weight(this, node, idxName);
460+
final float nodeWeight = weightFunction.calculateNodeWeightWithIndex(this, node, idxName);
455461
// if the node we are examining has a worse (higher) weight than the node the shard is
456462
// assigned to, then there is no way moving the shard to the node with the worse weight
457463
// can make the balance of the cluster better, so we check for that here
@@ -1028,7 +1034,7 @@ private AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting s
10281034
}
10291035

10301036
// weight of this index currently on the node
1031-
float currentWeight = weight.weight(this, node, shard.getIndexName());
1037+
float currentWeight = weightFunction.calculateNodeWeightWithIndex(this, node, shard.getIndexName());
10321038
// moving the shard would not improve the balance, and we are not in explain mode, so short circuit
10331039
if (currentWeight > minWeight && explain == false) {
10341040
continue;
@@ -1319,7 +1325,7 @@ public void reset(String index) {
13191325
}
13201326

13211327
public float weight(ModelNode node) {
1322-
return function.weight(balancer, node, index);
1328+
return function.calculateNodeWeightWithIndex(balancer, node, index);
13231329
}
13241330

13251331
public float minWeightDelta() {

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
*
2222
* @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated
2323
* @param weightsPerNode The node weights calculated based on
24-
* {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#nodeWeight}
24+
* {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#calculateNodeWeight}
2525
*/
2626
public record DesiredBalance(
2727
long lastConvergedIndex,

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
package org.elasticsearch.cluster.routing.allocation.allocator;
1111

1212
import org.elasticsearch.cluster.node.DiscoveryNode;
13-
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsProvider.NodeAllocationAndClusterBalanceStats;
13+
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight;
1414
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
1515
import org.elasticsearch.telemetry.metric.LongWithAttributes;
1616
import org.elasticsearch.telemetry.metric.MeterRegistry;
@@ -20,6 +20,12 @@
2020
import java.util.Map;
2121
import java.util.concurrent.atomic.AtomicReference;
2222

23+
/**
24+
* Maintains balancer metrics and makes them accessible to the {@link MeterRegistry} and APM reporting. Metrics are updated
25+
* ({@link #updateMetrics}) or cleared ({@link #zeroAllMetrics}) as a result of cluster events and the metrics will be pulled for reporting
26+
* via the MeterRegistry implementation. Only the master node reports metrics: see {@link #setNodeIsMaster}. When
27+
* {@link #nodeIsMaster} is false, empty values are returned such that MeterRegistry ignores the metrics for reporting purposes.
28+
*/
2329
public class DesiredBalanceMetrics {
2430

2531
public record AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {}
@@ -69,13 +75,14 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w
6975
private volatile long undesiredAllocations;
7076

7177
private final AtomicReference<Map<DiscoveryNode, NodeWeightStats>> weightStatsPerNodeRef = new AtomicReference<>(Map.of());
72-
private final AtomicReference<Map<DiscoveryNode, NodeAllocationAndClusterBalanceStats>> allocationStatsPerNodeRef =
73-
new AtomicReference<>(Map.of());
78+
private final AtomicReference<Map<DiscoveryNode, NodeAllocationStatsAndWeight>> allocationStatsPerNodeRef = new AtomicReference<>(
79+
Map.of()
80+
);
7481

7582
public void updateMetrics(
7683
AllocationStats allocationStats,
7784
Map<DiscoveryNode, NodeWeightStats> weightStatsPerNode,
78-
Map<DiscoveryNode, NodeAllocationAndClusterBalanceStats> nodeAllocationStats
85+
Map<DiscoveryNode, NodeAllocationStatsAndWeight> nodeAllocationStats
7986
) {
8087
assert allocationStats != null : "allocation stats cannot be null";
8188
assert weightStatsPerNode != null : "node balance weight stats cannot be null";
@@ -170,6 +177,10 @@ public DesiredBalanceMetrics(MeterRegistry meterRegistry) {
170177
);
171178
}
172179

180+
/**
181+
* When {@link #nodeIsMaster} is set to true, the server will report APM metrics registered in this file. When set to false, empty
182+
* values will be returned such that no APM metrics are sent from this server.
183+
*/
173184
public void setNodeIsMaster(boolean nodeIsMaster) {
174185
this.nodeIsMaster = nodeIsMaster;
175186
}
@@ -339,6 +350,10 @@ private List<DoubleWithAttributes> getUndesiredAllocationsRatioMetrics() {
339350
return List.of();
340351
}
341352

353+
/**
354+
* Sets all the internal class fields to zero/empty. Typically used in conjunction with {@link #setNodeIsMaster}.
355+
* This is best-effort because it is possible for {@link #updateMetrics} to race with this method.
356+
*/
342357
public void zeroAllMetrics() {
343358
unassignedShards = 0;
344359
totalAllocations = 0;

0 commit comments

Comments
 (0)