Skip to content

Commit 6704f4f

Browse files
committed
allocation: add balancer round summary as metrics
This commit adds the BalancerRoundSummary as a collection of APM/open telemetry metrics. These are already logged. The summary collected every ten seconds or so is set as the current state into the telemetry metrics class (AllocationBalancingRoundMetrics). Whenever the telemetry runs, each metric picks up its current view.
1 parent de75028 commit 6704f4f

File tree

9 files changed

+328
-21
lines changed

9 files changed

+328
-21
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
4141
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
4242
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
43+
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics;
4344
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
4445
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
4546
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory;
@@ -140,6 +141,7 @@ public class ClusterModule extends AbstractModule {
140141
private final AllocationStatsService allocationStatsService;
141142
private final TelemetryProvider telemetryProvider;
142143
private final DesiredBalanceMetrics desiredBalanceMetrics;
144+
private final AllocationBalancingRoundMetrics balancingRoundMetrics;
143145

144146
public ClusterModule(
145147
Settings settings,
@@ -167,6 +169,7 @@ public ClusterModule(
167169
balancingWeightsFactory
168170
);
169171
this.desiredBalanceMetrics = new DesiredBalanceMetrics(telemetryProvider.getMeterRegistry());
172+
this.balancingRoundMetrics = new AllocationBalancingRoundMetrics(telemetryProvider.getMeterRegistry());
170173
this.shardsAllocator = createShardsAllocator(
171174
settings,
172175
clusterService.getClusterSettings(),
@@ -179,7 +182,8 @@ public ClusterModule(
179182
writeLoadForecaster,
180183
nodeAllocationStatsAndWeightsCalculator,
181184
this::explainShardAllocation,
182-
desiredBalanceMetrics
185+
desiredBalanceMetrics,
186+
balancingRoundMetrics
183187
);
184188
this.clusterService = clusterService;
185189
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadPool.getThreadContext(), systemIndices, projectResolver);
@@ -510,7 +514,8 @@ private static ShardsAllocator createShardsAllocator(
510514
WriteLoadForecaster writeLoadForecaster,
511515
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator,
512516
ShardAllocationExplainer shardAllocationExplainer,
513-
DesiredBalanceMetrics desiredBalanceMetrics
517+
DesiredBalanceMetrics desiredBalanceMetrics,
518+
AllocationBalancingRoundMetrics balancingRoundMetrics
514519
) {
515520
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
516521
allocators.put(
@@ -527,7 +532,8 @@ private static ShardsAllocator createShardsAllocator(
527532
reconciler,
528533
nodeAllocationStatsAndWeightsCalculator,
529534
shardAllocationExplainer,
530-
desiredBalanceMetrics
535+
desiredBalanceMetrics,
536+
balancingRoundMetrics
531537
)
532538
);
533539

@@ -572,6 +578,7 @@ protected void configure() {
572578
bind(AllocationStatsService.class).toInstance(allocationStatsService);
573579
bind(TelemetryProvider.class).toInstance(telemetryProvider);
574580
bind(DesiredBalanceMetrics.class).toInstance(desiredBalanceMetrics);
581+
bind(AllocationBalancingRoundMetrics.class).toInstance(balancingRoundMetrics);
575582
bind(MetadataRolloverService.class).asEagerSingleton();
576583
}
577584

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
13+
import org.elasticsearch.telemetry.metric.LongWithAttributes;
14+
import org.elasticsearch.telemetry.metric.MeterRegistry;
15+
16+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingRoundSummary.NodesWeightsChanges;
17+
18+
import java.util.ArrayList;
19+
import java.util.Collection;
20+
import java.util.Collections;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
import java.util.function.Supplier;
25+
import java.util.function.ToLongFunction;
26+
27+
/**
28+
* A telemetry metrics sender for {@link BalancingRoundSummary.CombinedBalancingRoundSummary}
29+
*/
30+
public class AllocationBalancingRoundMetrics {
31+
32+
public static final String NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME = "es.allocator.balancing_round.balancing_rounds";
33+
public static final String NUMBER_OF_SHARD_MOVES_METRIC_NAME = "es.allocator.balancing_round.shard_moves";
34+
35+
public static final String NUMBER_OF_SHARDS_METRIC_NAME = "es.allocator.balancing_round.shard_count";
36+
public static final String NUMBER_OF_SHARDS_DELTA_METRIC_NAME = "es.allocator.balancing_round.shard_count_delta";
37+
38+
public static final String DISK_USAGE_BYTES_METRIC_NAME = "es.allocator.balancing_round.disk_usage_bytes";
39+
public static final String DISK_USAGE_BYTES_DELTA_METRIC_NAME = "es.allocator.balancing_round.disk_usage_bytes_delta";
40+
41+
public static final String WRITE_LOAD_METRIC_NAME = "es.allocator.balancing_round.write_load";
42+
public static final String WRITE_LOAD_DELTA_METRIC_NAME = "es.allocator.balancing_round.write_load_delta";
43+
44+
public static final String TOTAL_WEIGHT_METRIC_NAME = "es.allocator.balancing_round.total_weight";
45+
public static final String TOTAL_WEIGHT_DELTA_METRIC_NAME = "es.allocator.balancing_round.total_weight_delta";
46+
47+
/**
48+
* The current view of the last period's summary
49+
*/
50+
private final AtomicReference<BalancingRoundSummary.CombinedBalancingRoundSummary> combinedSummariesRef = new AtomicReference<>();
51+
52+
public static final AllocationBalancingRoundMetrics NOOP = new AllocationBalancingRoundMetrics(MeterRegistry.NOOP);
53+
54+
private final MeterRegistry meterRegistry;
55+
56+
public AllocationBalancingRoundMetrics(MeterRegistry meterRegistry) {
57+
this.meterRegistry = meterRegistry;
58+
59+
meterRegistry.registerLongsGauge(
60+
NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME,
61+
"Current number of balancing rounds",
62+
"unit",
63+
this::getBalancingRounds
64+
);
65+
66+
meterRegistry.registerLongsGauge(
67+
NUMBER_OF_SHARD_MOVES_METRIC_NAME,
68+
"Current number of shard moves",
69+
"{shard}",
70+
this::getShardMoves
71+
);
72+
73+
meterRegistry.registerLongsGauge(
74+
NUMBER_OF_SHARDS_METRIC_NAME,
75+
"Current number of shards",
76+
"unit",
77+
this::getShardCount
78+
);
79+
meterRegistry.registerLongsGauge(
80+
NUMBER_OF_SHARDS_DELTA_METRIC_NAME,
81+
"Current number of shard moves",
82+
"{shard}",
83+
this::getShardCountDelta
84+
);
85+
86+
meterRegistry.registerDoublesGauge(
87+
DISK_USAGE_BYTES_METRIC_NAME,
88+
"Disk usage in bytes",
89+
"unit",
90+
this::getDiskUsage
91+
);
92+
meterRegistry.registerDoublesGauge(
93+
DISK_USAGE_BYTES_DELTA_METRIC_NAME,
94+
"Disk usage delta in bytes",
95+
"{shard}",
96+
this::getDiskUsageDelta
97+
);
98+
99+
meterRegistry.registerDoublesGauge(
100+
WRITE_LOAD_METRIC_NAME,
101+
"Write load",
102+
"1.0",
103+
this::getWriteLoad
104+
);
105+
meterRegistry.registerDoublesGauge(
106+
WRITE_LOAD_DELTA_METRIC_NAME,
107+
"Write load",
108+
"1.0",
109+
this::getWriteLoadDelta
110+
);
111+
112+
meterRegistry.registerDoublesGauge(
113+
TOTAL_WEIGHT_METRIC_NAME,
114+
"Total weight",
115+
"1.0",
116+
this::getTotalWeight
117+
);
118+
meterRegistry.registerDoublesGauge(
119+
TOTAL_WEIGHT_DELTA_METRIC_NAME,
120+
"Total weight delta",
121+
"1.0",
122+
this::getTotalWeightDelta
123+
);
124+
}
125+
126+
public void updateRoundMetrics(BalancingRoundSummary.CombinedBalancingRoundSummary summary) {
127+
combinedSummariesRef.set(summary);
128+
}
129+
130+
public void clearRoundMetrics() {
131+
combinedSummariesRef.set(null);
132+
}
133+
134+
private Map<String, Object> getNodeAttributes(String nodeId) {
135+
return Map.of("node_id", nodeId);
136+
}
137+
138+
private List<LongWithAttributes> getBalancingRounds() {
139+
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
140+
if (combinedSummary == null) {
141+
return Collections.emptyList();
142+
}
143+
LongWithAttributes result = new LongWithAttributes(combinedSummary.numberOfShardMoves());
144+
return List.of(result);
145+
}
146+
147+
private List<LongWithAttributes> getShardMoves() {
148+
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
149+
if (combinedSummary == null) {
150+
return Collections.emptyList();
151+
}
152+
LongWithAttributes result = new LongWithAttributes(combinedSummary.numberOfShardMoves());
153+
return List.of(result);
154+
}
155+
156+
private List<LongWithAttributes> getShardCount() {
157+
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
158+
if (combinedSummary == null) {
159+
return Collections.emptyList();
160+
}
161+
162+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
163+
List<LongWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
164+
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
165+
metrics.add(new LongWithAttributes(nodeWeights.getValue().baseWeights().shardCount(), getNodeAttributes(nodeWeights.getKey())));
166+
}
167+
return metrics;
168+
}
169+
170+
private List<LongWithAttributes> getShardCountDelta() {
171+
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
172+
if (combinedSummary == null) {
173+
return Collections.emptyList();
174+
}
175+
176+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
177+
List<LongWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
178+
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
179+
metrics.add(new LongWithAttributes(nodeWeights.getValue().weightsDiff().shardCountDiff(), getNodeAttributes(nodeWeights.getKey())));
180+
}
181+
return metrics;
182+
}
183+
184+
private List<DoubleWithAttributes> getDiskUsage() {
185+
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
186+
if (combinedSummary == null) {
187+
return Collections.emptyList();
188+
}
189+
190+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
191+
List<DoubleWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
192+
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
193+
metrics.add(new DoubleWithAttributes(nodeWeights.getValue().baseWeights().diskUsageInBytes(), getNodeAttributes(nodeWeights.getKey())));
194+
}
195+
return metrics;
196+
}
197+
198+
private List<DoubleWithAttributes> getDiskUsageDelta() {
199+
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
200+
if (combinedSummary == null) {
201+
return Collections.emptyList();
202+
}
203+
204+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
205+
List<DoubleWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
206+
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
207+
metrics.add(new DoubleWithAttributes(nodeWeights.getValue().weightsDiff().diskUsageInBytesDiff(), getNodeAttributes(nodeWeights.getKey())));
208+
}
209+
return metrics;
210+
}
211+
212+
private List<DoubleWithAttributes> getWriteLoad() {
213+
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
214+
if (combinedSummary == null) {
215+
return Collections.emptyList();
216+
}
217+
218+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
219+
List<DoubleWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
220+
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
221+
metrics.add(new DoubleWithAttributes(nodeWeights.getValue().baseWeights().writeLoad(), getNodeAttributes(nodeWeights.getKey())));
222+
}
223+
return metrics;
224+
}
225+
226+
private List<DoubleWithAttributes> getWriteLoadDelta() {
227+
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
228+
if (combinedSummary == null) {
229+
return Collections.emptyList();
230+
}
231+
232+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
233+
List<DoubleWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
234+
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
235+
metrics.add(new DoubleWithAttributes(nodeWeights.getValue().weightsDiff().writeLoadDiff(), getNodeAttributes(nodeWeights.getKey())));
236+
}
237+
return metrics;
238+
}
239+
240+
private List<DoubleWithAttributes> getTotalWeight() {
241+
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
242+
if (combinedSummary == null) {
243+
return Collections.emptyList();
244+
}
245+
246+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
247+
List<DoubleWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
248+
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
249+
metrics.add(new DoubleWithAttributes(nodeWeights.getValue().baseWeights().nodeWeight(), getNodeAttributes(nodeWeights.getKey())));
250+
}
251+
return metrics;
252+
}
253+
254+
private List<DoubleWithAttributes> getTotalWeightDelta() {
255+
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
256+
if (combinedSummary == null) {
257+
return Collections.emptyList();
258+
}
259+
260+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
261+
List<DoubleWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
262+
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
263+
metrics.add(new DoubleWithAttributes(nodeWeights.getValue().weightsDiff().totalWeightDiff(), getNodeAttributes(nodeWeights.getKey())));
264+
}
265+
return metrics;
266+
}
267+
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class AllocationBalancingRoundSummaryService {
5858
private final ThreadPool threadPool;
5959
private volatile boolean enableBalancerRoundSummaries;
6060
private volatile TimeValue summaryReportInterval;
61+
private final AllocationBalancingRoundMetrics balancingRoundMetrics;
6162

6263
/**
6364
* A concurrency-safe list of balancing round summaries. Balancer rounds are run and added here serially, so the queue will naturally
@@ -69,11 +70,17 @@ public class AllocationBalancingRoundSummaryService {
6970
private final AtomicReference<Scheduler.Cancellable> scheduledReportFuture = new AtomicReference<>();
7071

7172
public AllocationBalancingRoundSummaryService(ThreadPool threadPool, ClusterSettings clusterSettings) {
73+
this(threadPool, clusterSettings, AllocationBalancingRoundMetrics.NOOP);
74+
}
75+
76+
public AllocationBalancingRoundSummaryService(ThreadPool threadPool, ClusterSettings clusterSettings,
77+
AllocationBalancingRoundMetrics balancingRoundMetrics) {
7278
this.threadPool = threadPool;
7379
// Initialize the local setting values to avoid a null access when ClusterSettings#initializeAndWatch is called on each setting:
7480
// updating enableBalancerRoundSummaries accesses summaryReportInterval.
7581
this.enableBalancerRoundSummaries = clusterSettings.get(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING);
7682
this.summaryReportInterval = clusterSettings.get(BALANCER_ROUND_SUMMARIES_LOG_INTERVAL_SETTING);
83+
this.balancingRoundMetrics = balancingRoundMetrics;
7784

7885
clusterSettings.initializeAndWatch(ENABLE_BALANCER_ROUND_SUMMARIES_SETTING, value -> {
7986
this.enableBalancerRoundSummaries = value;
@@ -185,6 +192,8 @@ private void drainAndReportSummaries() {
185192
}
186193

187194
logger.info("Balancing round summaries: " + combinedSummaries);
195+
196+
balancingRoundMetrics.updateRoundMetrics(combinedSummaries);
188197
}
189198

190199
/**
@@ -213,6 +222,7 @@ private void updateBalancingRoundSummaryReporting() {
213222
cancelReporting();
214223
// Clear the data structure so that we don't retain unnecessary memory.
215224
drainSummaries();
225+
balancingRoundMetrics.clearRoundMetrics();
216226
}
217227
}
218228

0 commit comments

Comments
 (0)