Skip to content

Commit bef60ec

Browse files
committed
Break down allocation stats by tier for IngestMetricsService
1 parent e614d67 commit bef60ec

File tree

5 files changed

+79
-29
lines changed

5 files changed

+79
-29
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetrics.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,45 @@ public class DesiredBalanceMetrics {
3232

3333
/**
3434
* @param unassignedShards Shards that are not assigned to any node.
35+
* @param allocationStatsByRole A breakdown of the allocations stats by {@link ShardRouting.Role}
36+
*/
37+
public record AllocationStats(long unassignedShards, Map<ShardRouting.Role, RoleAllocationStats> allocationStatsByRole) {
38+
39+
public AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {
40+
this(
41+
unassignedShards,
42+
Map.of(ShardRouting.Role.DEFAULT, new RoleAllocationStats(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes))
43+
);
44+
}
45+
46+
public long totalAllocations() {
47+
return allocationStatsByRole.values().stream().mapToLong(RoleAllocationStats::totalAllocations).sum();
48+
}
49+
50+
public long undesiredAllocationsExcludingShuttingDownNodes() {
51+
return allocationStatsByRole.values()
52+
.stream()
53+
.mapToLong(RoleAllocationStats::undesiredAllocationsExcludingShuttingDownNodes)
54+
.sum();
55+
}
56+
}
57+
58+
/**
3559
* @param totalAllocations Shards that are assigned to a node.
3660
* @param undesiredAllocationsExcludingShuttingDownNodes Shards that are assigned to a node but must move to alleviate a resource
3761
* constraint per the {@link AllocationDeciders}. Excludes shards that must move
3862
* because of a node shutting down.
39-
* @param undesiredAllocationsExcludingShuttingDownNodesByRole A breakdown of the undesired allocations by {@link ShardRouting.Role}
4063
*/
41-
public record AllocationStats(
42-
long unassignedShards,
43-
long totalAllocations,
44-
long undesiredAllocationsExcludingShuttingDownNodes,
45-
Map<ShardRouting.Role, Long> undesiredAllocationsExcludingShuttingDownNodesByRole
46-
) {}
64+
public record RoleAllocationStats(long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {
65+
public static final RoleAllocationStats EMPTY = new RoleAllocationStats(0L, 0L);
66+
67+
public float undesiredAllocationsRatio() {
68+
if (totalAllocations == 0) {
69+
return 0f;
70+
}
71+
return undesiredAllocationsExcludingShuttingDownNodes / (float) totalAllocations;
72+
}
73+
}
4774

4875
public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {
4976
public static final NodeWeightStats ZERO = new NodeWeightStats(0, 0, 0, 0);
@@ -78,7 +105,7 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w
78105
public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME =
79106
"es.allocator.allocations.node.forecasted_disk_usage_bytes.current";
80107

81-
public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, -1, -1, Map.of());
108+
public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, Map.of());
82109

83110
private volatile boolean nodeIsMaster = false;
84111

@@ -98,6 +125,12 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w
98125
*/
99126
private volatile long undesiredAllocationsExcludingShuttingDownNodes;
100127

128+
/**
129+
* A breakdown of shards assigned and the undesired allocations from the last reconciliation
130+
* broken down by {@link ShardRouting.Role}.
131+
*/
132+
private volatile Map<ShardRouting.Role, RoleAllocationStats> allocationStatsByRole;
133+
101134
private final AtomicReference<Map<DiscoveryNode, NodeWeightStats>> weightStatsPerNodeRef = new AtomicReference<>(Map.of());
102135
private final AtomicReference<Map<DiscoveryNode, NodeAllocationStatsAndWeight>> allocationStatsPerNodeRef = new AtomicReference<>(
103136
Map.of()
@@ -112,8 +145,9 @@ public void updateMetrics(
112145
assert weightStatsPerNode != null : "node balance weight stats cannot be null";
113146
if (allocationStats != EMPTY_ALLOCATION_STATS) {
114147
this.unassignedShards = allocationStats.unassignedShards;
115-
this.totalAllocations = allocationStats.totalAllocations;
116-
this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes;
148+
this.totalAllocations = allocationStats.totalAllocations();
149+
this.undesiredAllocationsExcludingShuttingDownNodes = allocationStats.undesiredAllocationsExcludingShuttingDownNodes();
150+
this.allocationStatsByRole = allocationStats.allocationStatsByRole();
117151
}
118152
weightStatsPerNodeRef.set(weightStatsPerNode);
119153
allocationStatsPerNodeRef.set(nodeAllocationStats);
@@ -223,6 +257,10 @@ public long undesiredAllocations() {
223257
return undesiredAllocationsExcludingShuttingDownNodes;
224258
}
225259

260+
public Map<ShardRouting.Role, RoleAllocationStats> allocationStatsByRole() {
261+
return allocationStatsByRole;
262+
}
263+
226264
private List<LongWithAttributes> getUnassignedShardsMetrics() {
227265
return getIfPublishing(unassignedShards);
228266
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconciler.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,7 @@ private DesiredBalanceMetrics.AllocationStats balance() {
529529
int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size();
530530
int totalAllocations = 0;
531531
int undesiredAllocationsExcludingShuttingDownNodes = 0;
532+
final ObjectLongMap<ShardRouting.Role> totalAllocationsByRole = new ObjectLongHashMap<>();
532533
final ObjectLongMap<ShardRouting.Role> undesiredAllocationsExcludingShuttingDownNodesByRole = new ObjectLongHashMap<>();
533534

534535
// Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard
@@ -538,6 +539,7 @@ private DesiredBalanceMetrics.AllocationStats balance() {
538539
final var shardRouting = iterator.next();
539540

540541
totalAllocations++;
542+
totalAllocationsByRole.addTo(shardRouting.role(), 1);
541543

542544
if (shardRouting.started() == false) {
543545
// can only rebalance started shards
@@ -600,10 +602,16 @@ private DesiredBalanceMetrics.AllocationStats balance() {
600602
maybeLogUndesiredAllocationsWarning(totalAllocations, undesiredAllocationsExcludingShuttingDownNodes, routingNodes.size());
601603
return new DesiredBalanceMetrics.AllocationStats(
602604
unassignedShards,
603-
totalAllocations,
604-
undesiredAllocationsExcludingShuttingDownNodes,
605-
StreamSupport.stream(undesiredAllocationsExcludingShuttingDownNodesByRole.spliterator(), false)
606-
.collect(Collectors.toUnmodifiableMap(lc -> lc.key, lc -> lc.value))
605+
StreamSupport.stream(totalAllocationsByRole.spliterator(), false)
606+
.collect(
607+
Collectors.toUnmodifiableMap(
608+
lc -> lc.key,
609+
lc -> new DesiredBalanceMetrics.RoleAllocationStats(
610+
totalAllocationsByRole.get(lc.key),
611+
undesiredAllocationsExcludingShuttingDownNodesByRole.get(lc.key)
612+
)
613+
)
614+
)
607615
);
608616
}
609617

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,10 @@ public DesiredBalanceStats getStats() {
438438
);
439439
}
440440

441+
public Map<ShardRouting.Role, DesiredBalanceMetrics.RoleAllocationStats> getAllocationStatsByRole() {
442+
return desiredBalanceMetrics.allocationStatsByRole();
443+
}
444+
441445
private void onNoLongerMaster() {
442446
if (indexGenerator.getAndSet(-1) != -1) {
443447
currentDesiredBalanceRef.set(DesiredBalance.NOT_MASTER);

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceMetricsTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public void testZeroAllMetrics() {
2727
long unassignedShards = randomNonNegativeLong();
2828
long totalAllocations = randomNonNegativeLong();
2929
long undesiredAllocations = randomNonNegativeLong();
30-
metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations, Map.of()), Map.of(), Map.of());
30+
metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of());
3131
assertEquals(totalAllocations, metrics.totalAllocations());
3232
assertEquals(unassignedShards, metrics.unassignedShards());
3333
assertEquals(undesiredAllocations, metrics.undesiredAllocations());
@@ -44,7 +44,7 @@ public void testMetricsAreOnlyPublishedWhenNodeIsMaster() {
4444
long unassignedShards = randomNonNegativeLong();
4545
long totalAllocations = randomLongBetween(100, 10000000);
4646
long undesiredAllocations = randomLongBetween(0, totalAllocations);
47-
metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations, Map.of()), Map.of(), Map.of());
47+
metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of());
4848

4949
// Collect when not master
5050
meterRegistry.getRecorder().collect();
@@ -104,7 +104,7 @@ public void testUndesiredAllocationRatioIsZeroWhenTotalShardsIsZero() {
104104
RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry();
105105
DesiredBalanceMetrics metrics = new DesiredBalanceMetrics(meterRegistry);
106106
long unassignedShards = randomNonNegativeLong();
107-
metrics.updateMetrics(new AllocationStats(unassignedShards, 0, 0, Map.of()), Map.of(), Map.of());
107+
metrics.updateMetrics(new AllocationStats(unassignedShards, 0, 0), Map.of(), Map.of());
108108

109109
metrics.setNodeIsMaster(true);
110110
meterRegistry.getRecorder().collect();

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
281281
assertTrue(index1RoutingTable.primaryShard().unassigned());
282282
assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned));
283283
assertNotNull(allocationStats.get());
284-
assertEquals(new DesiredBalanceMetrics.AllocationStats(3, 1, 0, Map.of()), allocationStats.get());
284+
assertEquals(new DesiredBalanceMetrics.AllocationStats(3, 1, 0), allocationStats.get());
285285
}
286286

287287
// now relax the filter so that the replica of index-0 and the primary of index-1 can both be assigned to node-1, but the throttle
@@ -296,7 +296,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
296296
assertTrue(index1RoutingTable.primaryShard().initializing());
297297
assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned));
298298
assertNotNull(allocationStats.get());
299-
assertEquals(new DesiredBalanceMetrics.AllocationStats(2, 2, 0, Map.of()), allocationStats.get());
299+
assertEquals(new DesiredBalanceMetrics.AllocationStats(2, 2, 0), allocationStats.get());
300300
}
301301

302302
final var stateWithStartedPrimariesAndInitializingReplica = startInitializingShardsAndReroute(
@@ -313,7 +313,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
313313
assertTrue(index1RoutingTable.primaryShard().started());
314314
assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned));
315315
assertNotNull(allocationStats.get());
316-
assertEquals(new DesiredBalanceMetrics.AllocationStats(1, 3, 0, Map.of()), allocationStats.get());
316+
assertEquals(new DesiredBalanceMetrics.AllocationStats(1, 3, 0), allocationStats.get());
317317
}
318318
}
319319

@@ -910,7 +910,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
910910
assertThat(shardRouting.currentNodeId(), oneOf("node-0", "node-1"));
911911
}
912912
assertNotNull(allocationStats);
913-
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0, Map.of()), allocationStats.get());
913+
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get());
914914

915915
// Only allow allocation on two of the nodes, excluding the other two nodes.
916916
clusterSettings.applySettings(
@@ -926,7 +926,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
926926
assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // all still on desired nodes, no
927927
// movement needed
928928
assertNotNull(allocationStats);
929-
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0, Map.of()), allocationStats.get());
929+
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get());
930930

931931
desiredBalance.set(desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-2") || nodeId.equals("node-3")));
932932

@@ -937,12 +937,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
937937
assertThat(reroutedState.getRoutingNodes().node("node-1").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1));
938938
assertNotNull(allocationStats);
939939
// Total allocations counts relocating and intializing shards, so the two relocating shards will be counted twice.
940-
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4, Map.of(ShardRouting.Role.DEFAULT, 4L)), allocationStats.get());
940+
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4), allocationStats.get());
941941

942942
// Ensuring that we check the shortcut two-param canAllocate() method up front
943943
canAllocateRef.set(Decision.NO);
944944
assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop()));
945-
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6, Map.of(ShardRouting.Role.DEFAULT, 6L)), allocationStats.get());
945+
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get());
946946
canAllocateRef.set(Decision.YES);
947947

948948
// Restore filter to default
@@ -980,7 +980,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
980980
"test",
981981
ActionListener.noop()
982982
);
983-
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 7, 3, Map.of(ShardRouting.Role.DEFAULT, 3L)), allocationStats.get());
983+
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 7, 3), allocationStats.get());
984984

985985
assertThat(shuttingDownState.getRoutingNodes().node("node-2").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1));
986986
}
@@ -1048,7 +1048,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
10481048

10491049
// All still on desired nodes, no movement needed, cluster state remains the same.
10501050
assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop()));
1051-
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0, Map.of()), allocationStats.get());
1051+
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get());
10521052

10531053
desiredBalance.set(desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-2") || nodeId.equals("node-3")));
10541054

@@ -1076,15 +1076,15 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
10761076
assertThat(reroutedState.getRoutingNodes().node("node-0").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1));
10771077
assertThat(reroutedState.getRoutingNodes().node("node-1").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1));
10781078
assertNotNull(allocationStats.get());
1079-
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6, Map.of(ShardRouting.Role.DEFAULT, 6L)), allocationStats.get());
1079+
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get());
10801080

10811081
// Test that the AllocationStats are still updated, even though throttling is active. The cluster state should remain unchanged
10821082
// because due to throttling: the previous reroute request started relocating two shards and, since those reallocations have not
10831083
// been completed, no additional shard relocations can begin.
10841084
assertSame(reroutedState, allocationService.reroute(reroutedState, "test", ActionListener.noop()));
10851085
assertNotNull(allocationStats);
10861086
// Note: total allocations counts relocating and intializing shards, so the two relocating shards will be counted twice.
1087-
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4, Map.of(ShardRouting.Role.DEFAULT, 4L)), allocationStats.get());
1087+
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4), allocationStats.get());
10881088
}
10891089

10901090
public void testDoNotRebalanceToTheNodeThatNoLongerExists() {
@@ -1293,7 +1293,7 @@ public void testRebalanceDoesNotCauseHotSpots() {
12931293

12941294
var initializing = shardsWithState(allocation.routingNodes(), ShardRoutingState.INITIALIZING);
12951295
if (initializing.isEmpty()) {
1296-
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, shardsPerNode * numberOfNodes, 0, Map.of()), allocationStats);
1296+
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, shardsPerNode * numberOfNodes, 0), allocationStats);
12971297
break;
12981298
}
12991299

0 commit comments

Comments
 (0)