Skip to content

Commit e9f899e

Browse files
authored
Add current node weight as an APM metric (#117557)
As discussed previously, the current node weight (calculated the same way that we calculate for the desired balance computations) might also be useful to have as a metric. The difference is that the current node weight is calculated based on the current cluster state rather than the internal state of the BalancedShardsAllocator (i.e. Balancer and ModelNode). To share all the weight calculation logic I had to move out the weight function and a few related utilities. NodeAllocationStatsProvider is still shared by both the AllocationStatsService and the desired balance metric collection. Relates ES-10080
1 parent 2bc1b4f commit e9f899e

File tree

12 files changed

+297
-149
lines changed

12 files changed

+297
-149
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@ public void testDesiredBalanceMetrics() {
117117
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
118118
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
119119
}
120+
final var currentNodeWeightsMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
121+
DesiredBalanceMetrics.CURRENT_NODE_WEIGHT_METRIC_NAME
122+
);
123+
assertThat(currentNodeWeightsMetrics.size(), equalTo(2));
124+
for (var nodeStat : currentNodeWeightsMetrics) {
125+
assertTrue(nodeStat.isDouble());
126+
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
127+
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
128+
}
120129
final var currentNodeShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement(
121130
DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME
122131
);
@@ -196,6 +205,7 @@ private static void assertMetricsAreBeingPublished(String nodeName, boolean shou
196205
testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.DESIRED_BALANCE_NODE_SHARD_COUNT_METRIC_NAME),
197206
matcher
198207
);
208+
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WEIGHT_METRIC_NAME), matcher);
199209
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WRITE_LOAD_METRIC_NAME), matcher);
200210
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME), matcher);
201211
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME), matcher);

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public ClusterModule(
139139
this.clusterPlugins = clusterPlugins;
140140
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
141141
this.allocationDeciders = new AllocationDeciders(deciderList);
142-
var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster);
142+
var nodeAllocationStatsProvider = new NodeAllocationStatsProvider(writeLoadForecaster, clusterService.getClusterSettings());
143143
this.shardsAllocator = createShardsAllocator(
144144
settings,
145145
clusterService.getClusterSettings(),

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.Map;
1919
import java.util.function.Supplier;
20+
import java.util.stream.Collectors;
2021

2122
public class AllocationStatsService {
2223
private final ClusterService clusterService;
@@ -39,6 +40,26 @@ public AllocationStatsService(
3940
}
4041

4142
public Map<String, NodeAllocationStats> stats() {
42-
return nodeAllocationStatsProvider.stats(clusterService.state(), clusterInfoService.getClusterInfo(), desiredBalanceSupplier.get());
43+
var state = clusterService.state();
44+
var stats = nodeAllocationStatsProvider.stats(
45+
state.metadata(),
46+
state.getRoutingNodes(),
47+
clusterInfoService.getClusterInfo(),
48+
desiredBalanceSupplier.get()
49+
);
50+
return stats.entrySet()
51+
.stream()
52+
.collect(
53+
Collectors.toMap(
54+
Map.Entry::getKey,
55+
e -> new NodeAllocationStats(
56+
e.getValue().shards(),
57+
e.getValue().undesiredShards(),
58+
e.getValue().forecastedIngestLoad(),
59+
e.getValue().forecastedDiskUsage(),
60+
e.getValue().currentDiskUsage()
61+
)
62+
)
63+
);
4364
}
4465
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsProvider.java

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,15 @@
1010
package org.elasticsearch.cluster.routing.allocation;
1111

1212
import org.elasticsearch.cluster.ClusterInfo;
13-
import org.elasticsearch.cluster.ClusterState;
1413
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.cluster.metadata.Metadata;
1515
import org.elasticsearch.cluster.routing.RoutingNode;
16+
import org.elasticsearch.cluster.routing.RoutingNodes;
1617
import org.elasticsearch.cluster.routing.ShardRouting;
18+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
1719
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
20+
import org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction;
21+
import org.elasticsearch.common.settings.ClusterSettings;
1822
import org.elasticsearch.common.util.Maps;
1923
import org.elasticsearch.core.Nullable;
2024

@@ -23,17 +27,47 @@
2327
public class NodeAllocationStatsProvider {
2428
private final WriteLoadForecaster writeLoadForecaster;
2529

26-
public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster) {
30+
private volatile float indexBalanceFactor;
31+
private volatile float shardBalanceFactor;
32+
private volatile float writeLoadBalanceFactor;
33+
private volatile float diskUsageBalanceFactor;
34+
35+
public record NodeAllocationAndClusterBalanceStats(
36+
int shards,
37+
int undesiredShards,
38+
double forecastedIngestLoad,
39+
long forecastedDiskUsage,
40+
long currentDiskUsage,
41+
float currentNodeWeight
42+
) {}
43+
44+
public NodeAllocationStatsProvider(WriteLoadForecaster writeLoadForecaster, ClusterSettings clusterSettings) {
2745
this.writeLoadForecaster = writeLoadForecaster;
46+
clusterSettings.initializeAndWatch(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value);
47+
clusterSettings.initializeAndWatch(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value);
48+
clusterSettings.initializeAndWatch(
49+
BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING,
50+
value -> this.writeLoadBalanceFactor = value
51+
);
52+
clusterSettings.initializeAndWatch(
53+
BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING,
54+
value -> this.diskUsageBalanceFactor = value
55+
);
2856
}
2957

30-
public Map<String, NodeAllocationStats> stats(
31-
ClusterState clusterState,
58+
public Map<String, NodeAllocationAndClusterBalanceStats> stats(
59+
Metadata metadata,
60+
RoutingNodes routingNodes,
3261
ClusterInfo clusterInfo,
3362
@Nullable DesiredBalance desiredBalance
3463
) {
35-
var stats = Maps.<String, NodeAllocationStats>newMapWithExpectedSize(clusterState.getRoutingNodes().size());
36-
for (RoutingNode node : clusterState.getRoutingNodes()) {
64+
var weightFunction = new WeightFunction(shardBalanceFactor, indexBalanceFactor, writeLoadBalanceFactor, diskUsageBalanceFactor);
65+
var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
66+
var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
67+
var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes);
68+
69+
var stats = Maps.<String, NodeAllocationAndClusterBalanceStats>newMapWithExpectedSize(routingNodes.size());
70+
for (RoutingNode node : routingNodes) {
3771
int shards = 0;
3872
int undesiredShards = 0;
3973
double forecastedWriteLoad = 0.0;
@@ -44,7 +78,7 @@ public Map<String, NodeAllocationStats> stats(
4478
continue;
4579
}
4680
shards++;
47-
IndexMetadata indexMetadata = clusterState.metadata().getIndexSafe(shardRouting.index());
81+
IndexMetadata indexMetadata = metadata.getIndexSafe(shardRouting.index());
4882
if (isDesiredAllocation(desiredBalance, shardRouting) == false) {
4983
undesiredShards++;
5084
}
@@ -54,14 +88,23 @@ public Map<String, NodeAllocationStats> stats(
5488
currentDiskUsage += shardSize;
5589

5690
}
91+
float currentNodeWeight = weightFunction.nodeWeight(
92+
shards,
93+
avgShardsPerNode,
94+
forecastedWriteLoad,
95+
avgWriteLoadPerNode,
96+
currentDiskUsage,
97+
avgDiskUsageInBytesPerNode
98+
);
5799
stats.put(
58100
node.nodeId(),
59-
new NodeAllocationStats(
101+
new NodeAllocationAndClusterBalanceStats(
60102
shards,
61103
desiredBalance != null ? undesiredShards : -1,
62104
forecastedWriteLoad,
63105
forecastedDiskUsage,
64-
currentDiskUsage
106+
currentDiskUsage,
107+
currentNodeWeight
65108
)
66109
);
67110
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 13 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,17 @@ private void collectAndRecordNodeWeightStats(Balancer balancer, WeightFunction w
168168
Map<DiscoveryNode, DesiredBalanceMetrics.NodeWeightStats> nodeLevelWeights = new HashMap<>();
169169
for (var entry : balancer.nodes.entrySet()) {
170170
var node = entry.getValue();
171+
var nodeWeight = weightFunction.nodeWeight(
172+
node.numShards(),
173+
balancer.avgShardsPerNode(),
174+
node.writeLoad(),
175+
balancer.avgWriteLoadPerNode(),
176+
node.diskUsageInBytes(),
177+
balancer.avgDiskUsageInBytesPerNode()
178+
);
171179
nodeLevelWeights.put(
172180
node.routingNode.node(),
173-
new DesiredBalanceMetrics.NodeWeightStats(
174-
node.numShards(),
175-
node.diskUsageInBytes(),
176-
node.writeLoad(),
177-
weightFunction.nodeWeight(balancer, node)
178-
)
181+
new DesiredBalanceMetrics.NodeWeightStats(node.numShards(), node.diskUsageInBytes(), node.writeLoad(), nodeWeight)
179182
);
180183
}
181184
allocation.routingNodes().setBalanceWeightStatsPerNode(nodeLevelWeights);
@@ -252,65 +255,6 @@ public float getShardBalance() {
252255
return shardBalanceFactor;
253256
}
254257

255-
/**
256-
* This class is the primary weight function used to create balanced over nodes and shards in the cluster.
257-
* Currently this function has 3 properties:
258-
* <ul>
259-
* <li><code>index balance</code> - balance property over shards per index</li>
260-
* <li><code>shard balance</code> - balance property over shards per cluster</li>
261-
* </ul>
262-
* <p>
263-
* Each of these properties are expressed as factor such that the properties factor defines the relative
264-
* importance of the property for the weight function. For example if the weight function should calculate
265-
* the weights only based on a global (shard) balance the index balance can be set to {@code 0.0} and will
266-
* in turn have no effect on the distribution.
267-
* </p>
268-
* The weight per index is calculated based on the following formula:
269-
* <ul>
270-
* <li>
271-
* <code>weight<sub>index</sub>(node, index) = indexBalance * (node.numShards(index) - avgShardsPerNode(index))</code>
272-
* </li>
273-
* <li>
274-
* <code>weight<sub>node</sub>(node, index) = shardBalance * (node.numShards() - avgShardsPerNode)</code>
275-
* </li>
276-
* </ul>
277-
* <code>weight(node, index) = weight<sub>index</sub>(node, index) + weight<sub>node</sub>(node, index)</code>
278-
*/
279-
private static class WeightFunction {
280-
281-
private final float theta0;
282-
private final float theta1;
283-
private final float theta2;
284-
private final float theta3;
285-
286-
WeightFunction(float shardBalance, float indexBalance, float writeLoadBalance, float diskUsageBalance) {
287-
float sum = shardBalance + indexBalance + writeLoadBalance + diskUsageBalance;
288-
if (sum <= 0.0f) {
289-
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
290-
}
291-
theta0 = shardBalance / sum;
292-
theta1 = indexBalance / sum;
293-
theta2 = writeLoadBalance / sum;
294-
theta3 = diskUsageBalance / sum;
295-
}
296-
297-
float weight(Balancer balancer, ModelNode node, String index) {
298-
final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
299-
return nodeWeight(balancer, node) + theta1 * weightIndex;
300-
}
301-
302-
float nodeWeight(Balancer balancer, ModelNode node) {
303-
final float weightShard = node.numShards() - balancer.avgShardsPerNode();
304-
final float ingestLoad = (float) (node.writeLoad() - balancer.avgWriteLoadPerNode());
305-
final float diskUsage = (float) (node.diskUsageInBytes() - balancer.avgDiskUsageInBytesPerNode());
306-
return theta0 * weightShard + theta2 * ingestLoad + theta3 * diskUsage;
307-
}
308-
309-
float minWeightDelta(Balancer balancer, String index) {
310-
return theta0 * 1 + theta1 * 1 + theta2 * balancer.getShardWriteLoad(index) + theta3 * balancer.maxShardSizeBytes(index);
311-
}
312-
}
313-
314258
/**
315259
* A {@link Balancer}
316260
*/
@@ -335,63 +279,13 @@ private Balancer(WriteLoadForecaster writeLoadForecaster, RoutingAllocation allo
335279
this.metadata = allocation.metadata();
336280
this.weight = weight;
337281
this.threshold = threshold;
338-
avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size();
339-
avgWriteLoadPerNode = getTotalWriteLoad(writeLoadForecaster, metadata) / routingNodes.size();
340-
avgDiskUsageInBytesPerNode = ((double) getTotalDiskUsageInBytes(allocation.clusterInfo(), metadata) / routingNodes.size());
282+
avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
283+
avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
284+
avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes);
341285
nodes = Collections.unmodifiableMap(buildModelFromAssigned());
342286
sorter = newNodeSorter();
343287
}
344288

345-
private static double getTotalWriteLoad(WriteLoadForecaster writeLoadForecaster, Metadata metadata) {
346-
double writeLoad = 0.0;
347-
for (IndexMetadata indexMetadata : metadata.indices().values()) {
348-
writeLoad += getIndexWriteLoad(writeLoadForecaster, indexMetadata);
349-
}
350-
return writeLoad;
351-
}
352-
353-
private static double getIndexWriteLoad(WriteLoadForecaster writeLoadForecaster, IndexMetadata indexMetadata) {
354-
var shardWriteLoad = writeLoadForecaster.getForecastedWriteLoad(indexMetadata).orElse(0.0);
355-
return shardWriteLoad * numberOfCopies(indexMetadata);
356-
}
357-
358-
private static long getTotalDiskUsageInBytes(ClusterInfo clusterInfo, Metadata metadata) {
359-
long totalDiskUsageInBytes = 0;
360-
for (IndexMetadata indexMetadata : metadata.indices().values()) {
361-
totalDiskUsageInBytes += getIndexDiskUsageInBytes(clusterInfo, indexMetadata);
362-
}
363-
return totalDiskUsageInBytes;
364-
}
365-
366-
// Visible for testing
367-
static long getIndexDiskUsageInBytes(ClusterInfo clusterInfo, IndexMetadata indexMetadata) {
368-
if (indexMetadata.ignoreDiskWatermarks()) {
369-
// disk watermarks are ignored for partial searchable snapshots
370-
// and is equivalent to indexMetadata.isPartialSearchableSnapshot()
371-
return 0;
372-
}
373-
final long forecastedShardSize = indexMetadata.getForecastedShardSizeInBytes().orElse(-1L);
374-
long totalSizeInBytes = 0;
375-
int shardCount = 0;
376-
for (int shard = 0; shard < indexMetadata.getNumberOfShards(); shard++) {
377-
final ShardId shardId = new ShardId(indexMetadata.getIndex(), shard);
378-
final long primaryShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, true, -1L));
379-
if (primaryShardSize != -1L) {
380-
totalSizeInBytes += primaryShardSize;
381-
shardCount++;
382-
}
383-
final long replicaShardSize = Math.max(forecastedShardSize, clusterInfo.getShardSize(shardId, false, -1L));
384-
if (replicaShardSize != -1L) {
385-
totalSizeInBytes += replicaShardSize * indexMetadata.getNumberOfReplicas();
386-
shardCount += indexMetadata.getNumberOfReplicas();
387-
}
388-
}
389-
if (shardCount == numberOfCopies(indexMetadata)) {
390-
return totalSizeInBytes;
391-
}
392-
return shardCount == 0 ? 0 : (totalSizeInBytes / shardCount) * numberOfCopies(indexMetadata);
393-
}
394-
395289
private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) {
396290
if (indexMetadata.ignoreDiskWatermarks()) {
397291
// disk watermarks are ignored for partial searchable snapshots
@@ -401,10 +295,6 @@ private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMet
401295
return Math.max(indexMetadata.getForecastedShardSizeInBytes().orElse(0L), clusterInfo.getShardSize(shardRouting, 0L));
402296
}
403297

404-
private static int numberOfCopies(IndexMetadata indexMetadata) {
405-
return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas());
406-
}
407-
408298
private float getShardWriteLoad(String index) {
409299
return (float) writeLoadForecaster.getForecastedWriteLoad(metadata.index(index)).orElse(0.0);
410300
}
@@ -1433,7 +1323,7 @@ public float weight(ModelNode node) {
14331323
}
14341324

14351325
public float minWeightDelta() {
1436-
return function.minWeightDelta(balancer, index);
1326+
return function.minWeightDelta(balancer.getShardWriteLoad(index), balancer.maxShardSizeBytes(index));
14371327
}
14381328

14391329
@Override

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
*
2222
* @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated
2323
* @param weightsPerNode The node weights calculated based on
24-
* {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.WeightFunction#nodeWeight}
24+
* {@link org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction#nodeWeight}
2525
*/
2626
public record DesiredBalance(
2727
long lastConvergedIndex,

0 commit comments

Comments
 (0)