Skip to content

Commit 69150c8

Browse files
authored
Export desired balance node weight and its components as metrics (#115854)
Note that this includes only the three node-level weight components (out of the four), as we were not sure how to aggregate and expose the index-specific component and how useful it will be at all. - some of the weight components are also recalculated or exposed as stats (not APM metrics) else where (e.g. `AllocationStatsService`) but since they are available right where we calculate the weight (which we also want), I have just exported all of them together. - How to pass the weight from the BalancedAllocator which is used as a delegated allocator in the desired balance allocator, and from there to the reconciler where we publish, could probably also be done differently, but using `RoutingNodes` and `DesiredBalance` seemed to make more sense to me. Not sure if it is blasphemy for those more familiar with the allocation code! - I liked the `DesiredBalanceMetrics` and how its used so I tried to clean up its existing usage a bit and colocate the new metrics. Relates ES-9866
1 parent 6cf4536 commit 69150c8

File tree

9 files changed

+255
-21
lines changed

9 files changed

+255
-21
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerMetricsIT.java

Lines changed: 82 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.cluster.routing.allocation.allocator;
1111

12+
import org.elasticsearch.cluster.node.DiscoveryNode;
1213
import org.elasticsearch.common.util.CollectionUtils;
1314
import org.elasticsearch.plugins.Plugin;
1415
import org.elasticsearch.plugins.PluginsService;
@@ -17,10 +18,16 @@
1718
import org.hamcrest.Matcher;
1819

1920
import java.util.Collection;
21+
import java.util.stream.Collectors;
2022

2123
import static org.hamcrest.Matchers.empty;
24+
import static org.hamcrest.Matchers.equalTo;
25+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
26+
import static org.hamcrest.Matchers.in;
27+
import static org.hamcrest.Matchers.is;
2228
import static org.hamcrest.Matchers.not;
2329

30+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
2431
public class DesiredBalanceReconcilerMetricsIT extends ESIntegTestCase {
2532

2633
@Override
@@ -31,6 +38,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
3138
public void testDesiredBalanceGaugeMetricsAreOnlyPublishedByCurrentMaster() throws Exception {
3239
internalCluster().ensureAtLeastNumDataNodes(2);
3340
prepareCreate("test").setSettings(indexSettings(2, 1)).get();
41+
indexRandom(randomBoolean(), "test", between(50, 100));
3442
ensureGreen();
3543

3644
assertOnlyMasterIsPublishingMetrics();
@@ -45,6 +53,59 @@ public void testDesiredBalanceGaugeMetricsAreOnlyPublishedByCurrentMaster() thro
4553
}
4654
}
4755

56+
public void testDesiredBalanceNodeWeightMetrics() {
57+
internalCluster().startNodes(2);
58+
prepareCreate("test").setSettings(indexSettings(2, 1)).get();
59+
indexRandom(randomBoolean(), "test", between(50, 100));
60+
ensureGreen();
61+
final var telemetryPlugin = getTelemetryPlugin(internalCluster().getMasterName());
62+
telemetryPlugin.collect();
63+
assertThat(telemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.UNASSIGNED_SHARDS_METRIC_NAME), not(empty()));
64+
assertThat(telemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.TOTAL_SHARDS_METRIC_NAME), not(empty()));
65+
assertThat(telemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.UNDESIRED_ALLOCATION_COUNT_METRIC_NAME), not(empty()));
66+
assertThat(telemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.UNDESIRED_ALLOCATION_RATIO_METRIC_NAME), not(empty()));
67+
68+
var nodeIds = internalCluster().clusterService().state().nodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
69+
var nodeNames = internalCluster().clusterService().state().nodes().stream().map(DiscoveryNode::getName).collect(Collectors.toSet());
70+
71+
final var nodeWeightsMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
72+
DesiredBalanceMetrics.DESIRED_BALANCE_NODE_WEIGHT_METRIC_NAME
73+
);
74+
assertThat(nodeWeightsMetrics.size(), equalTo(2));
75+
for (var nodeStat : nodeWeightsMetrics) {
76+
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
77+
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
78+
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
79+
}
80+
final var nodeShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement(
81+
DesiredBalanceMetrics.DESIRED_BALANCE_NODE_SHARD_COUNT_METRIC_NAME
82+
);
83+
assertThat(nodeShardCountMetrics.size(), equalTo(2));
84+
for (var nodeStat : nodeShardCountMetrics) {
85+
assertThat(nodeStat.value().longValue(), equalTo(2L));
86+
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
87+
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
88+
}
89+
final var nodeWriteLoadMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
90+
DesiredBalanceMetrics.DESIRED_BALANCE_NODE_WRITE_LOAD_METRIC_NAME
91+
);
92+
assertThat(nodeWriteLoadMetrics.size(), equalTo(2));
93+
for (var nodeStat : nodeWriteLoadMetrics) {
94+
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
95+
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
96+
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
97+
}
98+
final var nodeDiskUsageMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
99+
DesiredBalanceMetrics.DESIRED_BALANCE_NODE_DISK_USAGE_METRIC_NAME
100+
);
101+
assertThat(nodeDiskUsageMetrics.size(), equalTo(2));
102+
for (var nodeStat : nodeDiskUsageMetrics) {
103+
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
104+
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
105+
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
106+
}
107+
}
108+
48109
private static void assertOnlyMasterIsPublishingMetrics() {
49110
String masterNodeName = internalCluster().getMasterName();
50111
String[] nodeNames = internalCluster().getNodeNames();
@@ -54,16 +115,33 @@ private static void assertOnlyMasterIsPublishingMetrics() {
54115
}
55116

56117
private static void assertMetricsAreBeingPublished(String nodeName, boolean shouldBePublishing) {
57-
final TestTelemetryPlugin testTelemetryPlugin = internalCluster().getInstance(PluginsService.class, nodeName)
58-
.filterPlugins(TestTelemetryPlugin.class)
59-
.findFirst()
60-
.orElseThrow();
118+
final TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin(nodeName);
61119
testTelemetryPlugin.resetMeter();
62120
testTelemetryPlugin.collect();
63121
Matcher<Collection<?>> matcher = shouldBePublishing ? not(empty()) : empty();
64122
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.UNASSIGNED_SHARDS_METRIC_NAME), matcher);
65123
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.TOTAL_SHARDS_METRIC_NAME), matcher);
66124
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.UNDESIRED_ALLOCATION_COUNT_METRIC_NAME), matcher);
67125
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.UNDESIRED_ALLOCATION_RATIO_METRIC_NAME), matcher);
126+
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.DESIRED_BALANCE_NODE_WEIGHT_METRIC_NAME), matcher);
127+
assertThat(
128+
testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.DESIRED_BALANCE_NODE_WRITE_LOAD_METRIC_NAME),
129+
matcher
130+
);
131+
assertThat(
132+
testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.DESIRED_BALANCE_NODE_DISK_USAGE_METRIC_NAME),
133+
matcher
134+
);
135+
assertThat(
136+
testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.DESIRED_BALANCE_NODE_SHARD_COUNT_METRIC_NAME),
137+
matcher
138+
);
139+
}
140+
141+
private static TestTelemetryPlugin getTelemetryPlugin(String nodeName) {
142+
return internalCluster().getInstance(PluginsService.class, nodeName)
143+
.filterPlugins(TestTelemetryPlugin.class)
144+
.findFirst()
145+
.orElseThrow();
68146
}
69147
}

server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.node.DiscoveryNodes;
1717
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
1818
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
19+
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics;
1920
import org.elasticsearch.common.collect.Iterators;
2021
import org.elasticsearch.common.util.Maps;
2122
import org.elasticsearch.core.Assertions;
@@ -76,6 +77,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
7677
private final Map<String, Set<String>> attributeValuesByAttribute;
7778
private final Map<String, Recoveries> recoveriesPerNode;
7879

80+
private Map<DiscoveryNode, DesiredBalanceMetrics.NodeWeightStats> balanceWeightStatsPerNode;
81+
7982
/**
8083
* Creates an immutable instance from the {@link RoutingTable} and {@link DiscoveryNodes} found in a cluster state. Used to initialize
8184
* the routing nodes in {@link ClusterState#getRoutingNodes()}. This method should not be used directly, use
@@ -89,6 +92,14 @@ public static RoutingNodes mutable(RoutingTable routingTable, DiscoveryNodes dis
8992
return new RoutingNodes(routingTable, discoveryNodes, false);
9093
}
9194

95+
public void setBalanceWeightStatsPerNode(Map<DiscoveryNode, DesiredBalanceMetrics.NodeWeightStats> weightStatsPerNode) {
96+
this.balanceWeightStatsPerNode = weightStatsPerNode;
97+
}
98+
99+
public Map<DiscoveryNode, DesiredBalanceMetrics.NodeWeightStats> getBalanceWeightStatsPerNode() {
100+
return balanceWeightStatsPerNode;
101+
}
102+
92103
private RoutingNodes(RoutingTable routingTable, DiscoveryNodes discoveryNodes, boolean readOnly) {
93104
this.readOnly = readOnly;
94105
this.recoveriesPerNode = new HashMap<>();
@@ -97,6 +108,7 @@ private RoutingNodes(RoutingTable routingTable, DiscoveryNodes discoveryNodes, b
97108
this.unassignedShards = new UnassignedShards(this);
98109
this.attributeValuesByAttribute = Collections.synchronizedMap(new HashMap<>());
99110

111+
balanceWeightStatsPerNode = Maps.newMapWithExpectedSize(discoveryNodes.getDataNodes().size());
100112
nodesToShards = Maps.newMapWithExpectedSize(discoveryNodes.getDataNodes().size());
101113
// fill in the nodeToShards with the "live" nodes
102114
var dataNodes = discoveryNodes.getDataNodes().keySet();

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.ClusterInfo;
1717
import org.elasticsearch.cluster.metadata.IndexMetadata;
1818
import org.elasticsearch.cluster.metadata.Metadata;
19+
import org.elasticsearch.cluster.node.DiscoveryNode;
1920
import org.elasticsearch.cluster.routing.RoutingNode;
2021
import org.elasticsearch.cluster.routing.RoutingNodes;
2122
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -159,6 +160,25 @@ public void allocate(RoutingAllocation allocation) {
159160
balancer.allocateUnassigned();
160161
balancer.moveShards();
161162
balancer.balance();
163+
164+
collectAndRecordNodeWeightStats(balancer, weightFunction, allocation);
165+
}
166+
167+
private void collectAndRecordNodeWeightStats(Balancer balancer, WeightFunction weightFunction, RoutingAllocation allocation) {
168+
Map<DiscoveryNode, DesiredBalanceMetrics.NodeWeightStats> nodeLevelWeights = new HashMap<>();
169+
for (var entry : balancer.nodes.entrySet()) {
170+
var node = entry.getValue();
171+
nodeLevelWeights.put(
172+
node.routingNode.node(),
173+
new DesiredBalanceMetrics.NodeWeightStats(
174+
node.numShards(),
175+
node.diskUsageInBytes(),
176+
node.writeLoad(),
177+
weightFunction.nodeWeight(balancer, node)
178+
)
179+
);
180+
}
181+
allocation.routingNodes().setBalanceWeightStatsPerNode(nodeLevelWeights);
162182
}
163183

164184
@Override
@@ -275,11 +295,15 @@ private static class WeightFunction {
275295
}
276296

277297
float weight(Balancer balancer, ModelNode node, String index) {
278-
final float weightShard = node.numShards() - balancer.avgShardsPerNode();
279298
final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
299+
return nodeWeight(balancer, node) + theta1 * weightIndex;
300+
}
301+
302+
float nodeWeight(Balancer balancer, ModelNode node) {
303+
final float weightShard = node.numShards() - balancer.avgShardsPerNode();
280304
final float ingestLoad = (float) (node.writeLoad() - balancer.avgWriteLoadPerNode());
281305
final float diskUsage = (float) (node.diskUsageInBytes() - balancer.avgDiskUsageInBytesPerNode());
282-
return theta0 * weightShard + theta1 * weightIndex + theta2 * ingestLoad + theta3 * diskUsage;
306+
return theta0 * weightShard + theta2 * ingestLoad + theta3 * diskUsage;
283307
}
284308

285309
float minWeightDelta(Balancer balancer, String index) {

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalance.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.cluster.routing.allocation.allocator;
1111

12+
import org.elasticsearch.cluster.node.DiscoveryNode;
1213
import org.elasticsearch.common.util.set.Sets;
1314
import org.elasticsearch.index.shard.ShardId;
1415

@@ -19,8 +20,18 @@
1920
* The desired balance of the cluster, indicating which nodes should hold a copy of each shard.
2021
*
2122
* @param assignments a set of the (persistent) node IDs to which each {@link ShardId} should be allocated
23+
* @param weightsPerNode The node weights calculated based on
24+
* {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.WeightFunction#nodeWeight}
2225
*/
23-
public record DesiredBalance(long lastConvergedIndex, Map<ShardId, ShardAssignment> assignments) {
26+
public record DesiredBalance(
27+
long lastConvergedIndex,
28+
Map<ShardId, ShardAssignment> assignments,
29+
Map<DiscoveryNode, DesiredBalanceMetrics.NodeWeightStats> weightsPerNode
30+
) {
31+
32+
public DesiredBalance(long lastConvergedIndex, Map<ShardId, ShardAssignment> assignments) {
33+
this(lastConvergedIndex, assignments, Map.of());
34+
}
2435

2536
public static final DesiredBalance INITIAL = new DesiredBalance(-1, Map.of());
2637

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ public DesiredBalance compute(
368368
}
369369

370370
long lastConvergedIndex = hasChanges ? previousDesiredBalance.lastConvergedIndex() : desiredBalanceInput.index();
371-
return new DesiredBalance(lastConvergedIndex, assignments);
371+
return new DesiredBalance(lastConvergedIndex, assignments, routingNodes.getBalanceWeightStatsPerNode());
372372
}
373373

374374
private static Map<ShardId, ShardAssignment> collectShardAssignments(RoutingNodes routingNodes) {

0 commit comments

Comments
 (0)