Skip to content

Commit 5a94657

Browse files
committed
Added metrics consolidation, correct diff calculation, and some tests.
1 parent 840b003 commit 5a94657

File tree

2 files changed

+69
-151
lines changed

2 files changed

+69
-151
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundMetrics.java

Lines changed: 33 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingRoundSummary.NodesWeightsChanges;
1313
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
14+
import org.elasticsearch.telemetry.metric.LongCounter;
1415
import org.elasticsearch.telemetry.metric.LongWithAttributes;
1516
import org.elasticsearch.telemetry.metric.MeterRegistry;
1617

@@ -25,26 +26,23 @@
2526
*/
2627
public class AllocationBalancingRoundMetrics {
2728

29+
// counters that measure rounds and moves from the last balancing round
2830
public static final String NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME = "es.allocator.balancing_round.balancing_rounds";
29-
3031
public static final String NUMBER_OF_SHARD_MOVES_METRIC_NAME = "es.allocator.balancing_round.shard_moves";
3132

33+
// gauges that measure current utilization
3234
public static final String NUMBER_OF_SHARDS_METRIC_NAME = "es.allocator.balancing_round.shard_count";
33-
public static final String NUMBER_OF_SHARDS_DELTA_METRIC_NAME = "es.allocator.balancing_round.shard_count_delta";
34-
3535
public static final String DISK_USAGE_BYTES_METRIC_NAME = "es.allocator.balancing_round.disk_usage_bytes";
36-
public static final String DISK_USAGE_BYTES_DELTA_METRIC_NAME = "es.allocator.balancing_round.disk_usage_bytes_delta";
37-
3836
public static final String WRITE_LOAD_METRIC_NAME = "es.allocator.balancing_round.write_load";
39-
public static final String WRITE_LOAD_DELTA_METRIC_NAME = "es.allocator.balancing_round.write_load_delta";
40-
4137
public static final String TOTAL_WEIGHT_METRIC_NAME = "es.allocator.balancing_round.total_weight";
42-
public static final String TOTAL_WEIGHT_DELTA_METRIC_NAME = "es.allocator.balancing_round.total_weight_delta";
38+
39+
private final LongCounter balancingRoundCounter;
40+
private final LongCounter shardMovesCounter;
4341

4442
/**
4543
* The current view of the last period's summary
4644
*/
47-
private final AtomicReference<BalancingRoundSummary.CombinedBalancingRoundSummary> combinedSummariesRef = new AtomicReference<>();
45+
private final AtomicReference<Map<String, NodesWeightsChanges>> nodeNameToWeightChangesRef = new AtomicReference<>();
4846

4947
/**
5048
* Whether metrics sending is enabled
@@ -56,44 +54,16 @@ public class AllocationBalancingRoundMetrics {
5654
private final MeterRegistry meterRegistry;
5755

5856
public AllocationBalancingRoundMetrics(MeterRegistry meterRegistry) {
59-
this.combinedSummariesRef.set(BalancingRoundSummary.CombinedBalancingRoundSummary.EMPTY_RESULTS);
6057
this.meterRegistry = meterRegistry;
6158

62-
meterRegistry.registerLongsGauge(
63-
NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME,
64-
"Current number of balancing rounds",
65-
"unit",
66-
this::getBalancingRounds
67-
);
68-
69-
meterRegistry.registerLongsGauge(
70-
NUMBER_OF_SHARD_MOVES_METRIC_NAME,
71-
"Current number of shard moves",
72-
"{shard}",
73-
this::getShardMoves
74-
);
59+
this.balancingRoundCounter = meterRegistry.registerLongCounter(NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME, "Current number of balancing rounds", "unit");
60+
this.shardMovesCounter = meterRegistry.registerLongCounter(NUMBER_OF_SHARD_MOVES_METRIC_NAME, "Current number of shard moves", "{shard}");
61+
this.nodeNameToWeightChangesRef.set(Map.of());
7562

7663
meterRegistry.registerLongsGauge(NUMBER_OF_SHARDS_METRIC_NAME, "Current number of shards", "unit", this::getShardCount);
77-
meterRegistry.registerLongsGauge(
78-
NUMBER_OF_SHARDS_DELTA_METRIC_NAME,
79-
"Current number of shards delta",
80-
"{shard}",
81-
this::getShardCountDelta
82-
);
83-
8464
meterRegistry.registerDoublesGauge(DISK_USAGE_BYTES_METRIC_NAME, "Disk usage in bytes", "unit", this::getDiskUsage);
85-
meterRegistry.registerDoublesGauge(
86-
DISK_USAGE_BYTES_DELTA_METRIC_NAME,
87-
"Disk usage delta in bytes",
88-
"{shard}",
89-
this::getDiskUsageDelta
90-
);
91-
9265
meterRegistry.registerDoublesGauge(WRITE_LOAD_METRIC_NAME, "Write load", "1.0", this::getWriteLoad);
93-
meterRegistry.registerDoublesGauge(WRITE_LOAD_DELTA_METRIC_NAME, "Write load delta", "1.0", this::getWriteLoadDelta);
94-
9566
meterRegistry.registerDoublesGauge(TOTAL_WEIGHT_METRIC_NAME, "Total weight", "1.0", this::getTotalWeight);
96-
meterRegistry.registerDoublesGauge(TOTAL_WEIGHT_DELTA_METRIC_NAME, "Total weight delta", "1.0", this::getTotalWeightDelta);
9767
}
9868

9969
public void setEnableSending(boolean enableSending) {
@@ -102,63 +72,33 @@ public void setEnableSending(boolean enableSending) {
10272

10373
public void updateBalancingRoundMetrics(BalancingRoundSummary.CombinedBalancingRoundSummary summary) {
10474
assert summary != null : "balancing round metrics cannot be null";
105-
combinedSummariesRef.set(summary);
75+
76+
nodeNameToWeightChangesRef.set(summary.nodeNameToWeightChanges());
77+
if (enableSending) {
78+
balancingRoundCounter.incrementBy(summary.numberOfBalancingRounds());
79+
shardMovesCounter.incrementBy(summary.numberOfShardMoves());
80+
}
10681
}
10782

10883
public void clearBalancingRoundMetrics() {
109-
combinedSummariesRef.set(BalancingRoundSummary.CombinedBalancingRoundSummary.EMPTY_RESULTS);
84+
nodeNameToWeightChangesRef.set(Map.of());
11085
}
11186

11287
private Map<String, Object> getNodeAttributes(String nodeId) {
11388
return Map.of("node_id", nodeId);
11489
}
11590

116-
private List<LongWithAttributes> getBalancingRounds() {
117-
if (enableSending == false) {
118-
return Collections.emptyList();
119-
}
120-
121-
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
122-
LongWithAttributes result = new LongWithAttributes(combinedSummary.numberOfShardMoves());
123-
return List.of(result);
124-
}
125-
126-
private List<LongWithAttributes> getShardMoves() {
127-
if (enableSending == false) {
128-
return Collections.emptyList();
129-
}
130-
131-
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
132-
LongWithAttributes result = new LongWithAttributes(combinedSummary.numberOfShardMoves());
133-
return List.of(result);
134-
}
135-
13691
private List<LongWithAttributes> getShardCount() {
13792
if (enableSending == false) {
13893
return Collections.emptyList();
13994
}
14095

141-
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
142-
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
96+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = nodeNameToWeightChangesRef.get();
14397
List<LongWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
14498
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
145-
metrics.add(new LongWithAttributes(nodeWeights.getValue().baseWeights().shardCount(), getNodeAttributes(nodeWeights.getKey())));
146-
}
147-
return metrics;
148-
}
149-
150-
private List<LongWithAttributes> getShardCountDelta() {
151-
if (enableSending == false) {
152-
return Collections.emptyList();
153-
}
154-
155-
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
156-
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
157-
List<LongWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
158-
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
159-
metrics.add(
160-
new LongWithAttributes(nodeWeights.getValue().weightsDiff().shardCountDiff(), getNodeAttributes(nodeWeights.getKey()))
161-
);
99+
NodesWeightsChanges nodeWeightChanges = nodeWeights.getValue();
100+
long shardCount = nodeWeightChanges.baseWeights().shardCount() + nodeWeightChanges.weightsDiff().shardCountDiff();
101+
metrics.add(new LongWithAttributes(shardCount, getNodeAttributes(nodeWeights.getKey())));
162102
}
163103
return metrics;
164104
}
@@ -168,32 +108,12 @@ private List<DoubleWithAttributes> getDiskUsage() {
168108
return Collections.emptyList();
169109
}
170110

171-
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
172-
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
173-
List<DoubleWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
174-
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
175-
metrics.add(
176-
new DoubleWithAttributes(nodeWeights.getValue().baseWeights().diskUsageInBytes(), getNodeAttributes(nodeWeights.getKey()))
177-
);
178-
}
179-
return metrics;
180-
}
181-
182-
private List<DoubleWithAttributes> getDiskUsageDelta() {
183-
if (enableSending == false) {
184-
return Collections.emptyList();
185-
}
186-
187-
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
188-
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
111+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = nodeNameToWeightChangesRef.get();
189112
List<DoubleWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
190113
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
191-
metrics.add(
192-
new DoubleWithAttributes(
193-
nodeWeights.getValue().weightsDiff().diskUsageInBytesDiff(),
194-
getNodeAttributes(nodeWeights.getKey())
195-
)
196-
);
114+
NodesWeightsChanges nodeWeightChanges = nodeWeights.getValue();
115+
double diskUsage = nodeWeightChanges.baseWeights().diskUsageInBytes() + nodeWeightChanges.weightsDiff().diskUsageInBytesDiff();
116+
metrics.add(new DoubleWithAttributes(diskUsage, getNodeAttributes(nodeWeights.getKey())));
197117
}
198118
return metrics;
199119
}
@@ -203,29 +123,12 @@ private List<DoubleWithAttributes> getWriteLoad() {
203123
return Collections.emptyList();
204124
}
205125

206-
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
207-
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
126+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = nodeNameToWeightChangesRef.get();
208127
List<DoubleWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
209128
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
210-
metrics.add(
211-
new DoubleWithAttributes(nodeWeights.getValue().baseWeights().writeLoad(), getNodeAttributes(nodeWeights.getKey()))
212-
);
213-
}
214-
return metrics;
215-
}
216-
217-
private List<DoubleWithAttributes> getWriteLoadDelta() {
218-
if (enableSending == false) {
219-
return Collections.emptyList();
220-
}
221-
222-
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
223-
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
224-
List<DoubleWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
225-
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
226-
metrics.add(
227-
new DoubleWithAttributes(nodeWeights.getValue().weightsDiff().writeLoadDiff(), getNodeAttributes(nodeWeights.getKey()))
228-
);
129+
NodesWeightsChanges nodeWeightChanges = nodeWeights.getValue();
130+
double writeLoad = nodeWeightChanges.baseWeights().writeLoad() + nodeWeightChanges.weightsDiff().writeLoadDiff();
131+
metrics.add(new DoubleWithAttributes(writeLoad, getNodeAttributes(nodeWeights.getKey())));
229132
}
230133
return metrics;
231134
}
@@ -235,29 +138,12 @@ private List<DoubleWithAttributes> getTotalWeight() {
235138
return Collections.emptyList();
236139
}
237140

238-
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
239-
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
240-
List<DoubleWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
241-
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
242-
metrics.add(
243-
new DoubleWithAttributes(nodeWeights.getValue().baseWeights().nodeWeight(), getNodeAttributes(nodeWeights.getKey()))
244-
);
245-
}
246-
return metrics;
247-
}
248-
249-
private List<DoubleWithAttributes> getTotalWeightDelta() {
250-
if (enableSending == false) {
251-
return Collections.emptyList();
252-
}
253-
254-
final BalancingRoundSummary.CombinedBalancingRoundSummary combinedSummary = combinedSummariesRef.get();
255-
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = combinedSummary.nodeNameToWeightChanges();
141+
Map<String, NodesWeightsChanges> nodeNameToWeightChanges = nodeNameToWeightChangesRef.get();
256142
List<DoubleWithAttributes> metrics = new ArrayList<>(nodeNameToWeightChanges.size());
257143
for (var nodeWeights : nodeNameToWeightChanges.entrySet()) {
258-
metrics.add(
259-
new DoubleWithAttributes(nodeWeights.getValue().weightsDiff().totalWeightDiff(), getNodeAttributes(nodeWeights.getKey()))
260-
);
144+
NodesWeightsChanges nodeWeightChanges = nodeWeights.getValue();
145+
double totalWeight = nodeWeightChanges.baseWeights().nodeWeight() + nodeWeightChanges.weightsDiff().totalWeightDiff();
146+
metrics.add(new DoubleWithAttributes(totalWeight, getNodeAttributes(nodeWeights.getKey())));
261147
}
262148
return metrics;
263149
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/AllocationBalancingRoundSummaryServiceTests.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,22 @@
1717
import org.elasticsearch.common.settings.Settings;
1818
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
1919
import org.elasticsearch.index.shard.ShardId;
20+
import org.elasticsearch.telemetry.InstrumentType;
21+
import org.elasticsearch.telemetry.Measurement;
22+
import org.elasticsearch.telemetry.RecordingMeterRegistry;
2023
import org.elasticsearch.test.ESTestCase;
2124
import org.elasticsearch.test.MockLog;
2225
import org.elasticsearch.threadpool.ThreadPool;
2326
import org.junit.Before;
2427

28+
import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics.NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME;
29+
import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics.NUMBER_OF_SHARD_MOVES_METRIC_NAME;
30+
import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics.NUMBER_OF_SHARDS_METRIC_NAME;
31+
import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics.DISK_USAGE_BYTES_METRIC_NAME;
32+
import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics.WRITE_LOAD_METRIC_NAME;
33+
import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundMetrics.TOTAL_WEIGHT_METRIC_NAME;
34+
35+
import java.util.List;
2536
import java.util.Map;
2637
import java.util.Set;
2738

@@ -86,7 +97,9 @@ public void setUpThreadPool() {
8697
* {@link AllocationBalancingRoundSummaryService#ENABLE_BALANCER_ROUND_SUMMARIES_SETTING} defaults to false.
8798
*/
8899
public void testServiceDisabledByDefault() {
89-
var service = new AllocationBalancingRoundSummaryService(testThreadPool, disabledDefaultEmptyClusterSettings);
100+
var recordingMeterRegistry = new RecordingMeterRegistry();
101+
var balancingRoundMetrics = new AllocationBalancingRoundMetrics(recordingMeterRegistry);
102+
var service = new AllocationBalancingRoundSummaryService(testThreadPool, disabledDefaultEmptyClusterSettings, balancingRoundMetrics);
90103

91104
try (var mockLog = MockLog.capture(AllocationBalancingRoundSummaryService.class)) {
92105
/**
@@ -110,11 +123,16 @@ public void testServiceDisabledByDefault() {
110123
deterministicTaskQueue.runAllRunnableTasks();
111124
mockLog.awaitAllExpectationsMatched();
112125
service.verifyNumberOfSummaries(0);
126+
List<Measurement> measurements = recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME);
127+
assertEquals(measurements.size(), 0);
113128
}
114129
}
115130

116131
public void testEnabledService() {
117-
var service = new AllocationBalancingRoundSummaryService(testThreadPool, enabledClusterSettings);
132+
var recordingMeterRegistry = new RecordingMeterRegistry();
133+
var balancingRoundMetrics = new AllocationBalancingRoundMetrics(recordingMeterRegistry);
134+
var service = new AllocationBalancingRoundSummaryService(testThreadPool, enabledClusterSettings, balancingRoundMetrics);
135+
balancingRoundMetrics.setEnableSending(true);
118136

119137
try (var mockLog = MockLog.capture(AllocationBalancingRoundSummaryService.class)) {
120138
/**
@@ -156,14 +174,20 @@ public void testEnabledService() {
156174
deterministicTaskQueue.runAllRunnableTasks();
157175
mockLog.awaitAllExpectationsMatched();
158176
service.verifyNumberOfSummaries(0);
177+
178+
List<Measurement> measurements = recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME);
179+
assertEquals(measurements.size(), 2);
159180
}
160181
}
161182

162183
/**
163184
* The service should combine multiple summaries together into a single report when multiple summaries were added since the last report.
164185
*/
165186
public void testCombinedSummary() {
166-
var service = new AllocationBalancingRoundSummaryService(testThreadPool, enabledClusterSettings);
187+
var recordingMeterRegistry = new RecordingMeterRegistry();
188+
var balancingRoundMetrics = new AllocationBalancingRoundMetrics(recordingMeterRegistry);
189+
var service = new AllocationBalancingRoundSummaryService(testThreadPool, enabledClusterSettings, balancingRoundMetrics);
190+
balancingRoundMetrics.setEnableSending(true);
167191

168192
try (var mockLog = MockLog.capture(AllocationBalancingRoundSummaryService.class)) {
169193
service.addBalancerRoundSummary(new BalancingRoundSummary(NODE_NAME_TO_WEIGHT_CHANGES, 50));
@@ -182,14 +206,19 @@ public void testCombinedSummary() {
182206
deterministicTaskQueue.runAllRunnableTasks();
183207
mockLog.awaitAllExpectationsMatched();
184208
service.verifyNumberOfSummaries(0);
209+
210+
List<Measurement> measurements = recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME);
211+
assertEquals(measurements.size(), 2);
185212
}
186213
}
187214

188215
/**
189216
* The service shouldn't log anything when there haven't been any summaries added since the last report.
190217
*/
191218
public void testNoSummariesToReport() {
192-
var service = new AllocationBalancingRoundSummaryService(testThreadPool, enabledClusterSettings);
219+
var recordingMeterRegistry = new RecordingMeterRegistry();
220+
var balancingRoundMetrics = new AllocationBalancingRoundMetrics(recordingMeterRegistry);
221+
var service = new AllocationBalancingRoundSummaryService(testThreadPool, enabledClusterSettings, balancingRoundMetrics);
193222

194223
try (var mockLog = MockLog.capture(AllocationBalancingRoundSummaryService.class)) {
195224
/**
@@ -229,6 +258,9 @@ public void testNoSummariesToReport() {
229258
deterministicTaskQueue.runAllRunnableTasks();
230259
mockLog.awaitAllExpectationsMatched();
231260
service.verifyNumberOfSummaries(0);
261+
262+
List<Measurement> measurements = recordingMeterRegistry.getRecorder().getMeasurements(InstrumentType.LONG_COUNTER, NUMBER_OF_BALANCING_ROUNDS_METRIC_NAME);
263+
assertEquals(measurements.size(), 0);
232264
}
233265
}
234266

0 commit comments

Comments
 (0)