Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
Expand All @@ -35,8 +36,14 @@ public class DesiredBalanceMetrics {
* @param undesiredAllocationsExcludingShuttingDownNodes Shards that are assigned to a node but must move to alleviate a resource
* constraint per the {@link AllocationDeciders}. Excludes shards that must move
* because of a node shutting down.
* @param undesiredAllocationsExcludingShuttingDownNodesByRole A breakdown of the undesired allocations by {@link ShardRouting.Role}
*/
public record AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {}
public record AllocationStats(
long unassignedShards,
long totalAllocations,
long undesiredAllocationsExcludingShuttingDownNodes,
Map<ShardRouting.Role, Long> undesiredAllocationsExcludingShuttingDownNodesByRole
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to collect it all in a map of roles, then shouldn't it also replace totalAllocations and undesiredAllocationsExcludingShuttingDownNodes since those would be the values that the "default" role would have in stateful? but then you get into what to do for the values returned from desired balance API in serverless, and there you'd have to sum up index_only and search_only I guess, and sprinkle a bunch of asserts since index/search and default should be mutually exclusive in this map. Another option is to just add the specific break downs we need here? index/searchTierAllocations and index/searchTierUndesiredAllocations? I think you'd need both total and undesired since we're going to need the ratio in the autoscaler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also how do we get these two stats (index/TierAllocations and indexTierUndesiredAllocations?) out of the balancer in the autoscaler? Is there a getter for these?

Copy link
Contributor Author

@nicktindall nicktindall Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to just add the specific break downs we need here? index/searchTierAllocations and index/searchTierUndesiredAllocations?

As far as I can tell we have no concept of "tier" in the ES codebase. There is role, but we seem to stop short of defining that as equivalent to a tier. I don't quite understand why but I don't want to make assumptions about what role means in regards to tiers (see co.elastic.elasticsearch.stateless.allocation.StatelessAllocationDecider#canAllocateShardToNode, it's not even done there)

if we make two specific fields, we're baking in the assumption that

  • Role.INDEX_ONLY means indexing tier, Role.SEARCH_ONLY means search tier and Role.DEFAULT means the only tier in a stateful deployment
  • the existing set of roles are fixed

So we'd probably need to add assertions to trigger if these assumptions were no longer valid. I think having a map of role to counts (possibly Role -> record RoleStats(int total, int undesired)) at least leaves the interpretation of the roles to the context they're used in.

Maybe these are valid assumptions? Perhaps there is some history regarding the modelling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also how do we get these two stats (index/TierAllocations and indexTierUndesiredAllocations?) out of the balancer in the autoscaler? Is there a getter for these?

I added a getter and put up a serverless PR to illustrate that flow

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell we have no concept of "tier" in the ES codebase. There is role, but we seem to stop short of defining that as equivalent to a tier

:))) it feels like we're saying the same thing. index/searchTierAllocations and index/searchTierUndesiredAllocations are not good names, I agree with you. I was suggesting instead of the map, we could just add two more variables for total allocation and undesired allocation of index_only (or could be called promotable shards), since we don't need the rest. that's the only part of that map, that we use for now for the specific ticket that initiated this change. we could also use the map and ignore the rest, if you prefer, that's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I prefer the map. It's a breakdown of those stats by node role. In this code we never cared about node role previously, and we should continue to not care.

If we pick out specific roles to populate fields we're baking in knowledge of the roles and their meaning in this code, I'd rather we tried to keep that knowledge in the stateless code, which is the only place we should see anything other than DEFAULT, and the only place we're using the per-role stats.

) {}

public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {
public static final NodeWeightStats ZERO = new NodeWeightStats(0, 0, 0, 0);
Expand Down Expand Up @@ -71,7 +78,7 @@ public record NodeWeightStats(long shardCount, double diskUsageInBytes, double w
public static final String CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME =
"es.allocator.allocations.node.forecasted_disk_usage_bytes.current";

public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, -1, -1);
public static final AllocationStats EMPTY_ALLOCATION_STATS = new AllocationStats(-1, -1, -1, Map.of());

private volatile boolean nodeIsMaster = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.ObjectLongMap;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.ArrayUtil;
Expand Down Expand Up @@ -40,6 +43,7 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE;
import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize;
Expand Down Expand Up @@ -525,6 +529,7 @@ private DesiredBalanceMetrics.AllocationStats balance() {
int unassignedShards = routingNodes.unassigned().size() + routingNodes.unassigned().ignored().size();
int totalAllocations = 0;
int undesiredAllocationsExcludingShuttingDownNodes = 0;
final ObjectLongMap<ShardRouting.Role> undesiredAllocationsExcludingShuttingDownNodesByRole = new ObjectLongHashMap<>();

// Iterate over all started shards and try to move any which are on undesired nodes. In the presence of throttling shard
// movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are offloading the
Expand Down Expand Up @@ -553,6 +558,7 @@ private DesiredBalanceMetrics.AllocationStats balance() {
if (allocation.metadata().nodeShutdowns().contains(shardRouting.currentNodeId()) == false) {
// shard is not on a shutting down node, nor is it on a desired node per the previous check.
undesiredAllocationsExcludingShuttingDownNodes++;
undesiredAllocationsExcludingShuttingDownNodesByRole.addTo(shardRouting.role(), 1);
}

if (allocation.deciders().canRebalance(allocation).type() != Decision.Type.YES) {
Expand Down Expand Up @@ -595,7 +601,9 @@ private DesiredBalanceMetrics.AllocationStats balance() {
return new DesiredBalanceMetrics.AllocationStats(
unassignedShards,
totalAllocations,
undesiredAllocationsExcludingShuttingDownNodes
undesiredAllocationsExcludingShuttingDownNodes,
StreamSupport.stream(undesiredAllocationsExcludingShuttingDownNodesByRole.spliterator(), false)
.collect(Collectors.toUnmodifiableMap(lc -> lc.key, lc -> lc.value))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void testZeroAllMetrics() {
long unassignedShards = randomNonNegativeLong();
long totalAllocations = randomNonNegativeLong();
long undesiredAllocations = randomNonNegativeLong();
metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of());
metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations, Map.of()), Map.of(), Map.of());
assertEquals(totalAllocations, metrics.totalAllocations());
assertEquals(unassignedShards, metrics.unassignedShards());
assertEquals(undesiredAllocations, metrics.undesiredAllocations());
Expand All @@ -44,7 +44,7 @@ public void testMetricsAreOnlyPublishedWhenNodeIsMaster() {
long unassignedShards = randomNonNegativeLong();
long totalAllocations = randomLongBetween(100, 10000000);
long undesiredAllocations = randomLongBetween(0, totalAllocations);
metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations), Map.of(), Map.of());
metrics.updateMetrics(new AllocationStats(unassignedShards, totalAllocations, undesiredAllocations, Map.of()), Map.of(), Map.of());

// Collect when not master
meterRegistry.getRecorder().collect();
Expand Down Expand Up @@ -104,7 +104,7 @@ public void testUndesiredAllocationRatioIsZeroWhenTotalShardsIsZero() {
RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry();
DesiredBalanceMetrics metrics = new DesiredBalanceMetrics(meterRegistry);
long unassignedShards = randomNonNegativeLong();
metrics.updateMetrics(new AllocationStats(unassignedShards, 0, 0), Map.of(), Map.of());
metrics.updateMetrics(new AllocationStats(unassignedShards, 0, 0, Map.of()), Map.of(), Map.of());

metrics.setNodeIsMaster(true);
meterRegistry.getRecorder().collect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
assertTrue(index1RoutingTable.primaryShard().unassigned());
assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned));
assertNotNull(allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(3, 1, 0), allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(3, 1, 0, Map.of()), allocationStats.get());
}

// now relax the filter so that the replica of index-0 and the primary of index-1 can both be assigned to node-1, but the throttle
Expand All @@ -296,7 +296,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
assertTrue(index1RoutingTable.primaryShard().initializing());
assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned));
assertNotNull(allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(2, 2, 0), allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(2, 2, 0, Map.of()), allocationStats.get());
}

final var stateWithStartedPrimariesAndInitializingReplica = startInitializingShardsAndReroute(
Expand All @@ -313,7 +313,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
assertTrue(index1RoutingTable.primaryShard().started());
assertTrue(index1RoutingTable.replicaShards().stream().allMatch(ShardRouting::unassigned));
assertNotNull(allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(1, 3, 0), allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(1, 3, 0, Map.of()), allocationStats.get());
}
}

Expand Down Expand Up @@ -910,7 +910,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
assertThat(shardRouting.currentNodeId(), oneOf("node-0", "node-1"));
}
assertNotNull(allocationStats);
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0, Map.of()), allocationStats.get());

// Only allow allocation on two of the nodes, excluding the other two nodes.
clusterSettings.applySettings(
Expand All @@ -926,7 +926,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop())); // all still on desired nodes, no
// movement needed
assertNotNull(allocationStats);
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0, Map.of()), allocationStats.get());

desiredBalance.set(desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-2") || nodeId.equals("node-3")));

Expand All @@ -937,12 +937,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
assertThat(reroutedState.getRoutingNodes().node("node-1").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1));
assertNotNull(allocationStats);
// Total allocations counts relocating and intializing shards, so the two relocating shards will be counted twice.
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4), allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4, Map.of(ShardRouting.Role.DEFAULT, 4L)), allocationStats.get());

// Ensuring that we check the shortcut two-param canAllocate() method up front
canAllocateRef.set(Decision.NO);
assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop()));
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6, Map.of(ShardRouting.Role.DEFAULT, 6L)), allocationStats.get());
canAllocateRef.set(Decision.YES);

// Restore filter to default
Expand Down Expand Up @@ -980,7 +980,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
"test",
ActionListener.noop()
);
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 7, 3), allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 7, 3, Map.of(ShardRouting.Role.DEFAULT, 3L)), allocationStats.get());

assertThat(shuttingDownState.getRoutingNodes().node("node-2").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1));
}
Expand Down Expand Up @@ -1048,7 +1048,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat

// All still on desired nodes, no movement needed, cluster state remains the same.
assertSame(clusterState, allocationService.reroute(clusterState, "test", ActionListener.noop()));
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0), allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 0, Map.of()), allocationStats.get());

desiredBalance.set(desiredBalance(clusterState, (shardId, nodeId) -> nodeId.equals("node-2") || nodeId.equals("node-3")));

Expand Down Expand Up @@ -1076,15 +1076,15 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
assertThat(reroutedState.getRoutingNodes().node("node-0").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1));
assertThat(reroutedState.getRoutingNodes().node("node-1").numberOfShardsWithState(ShardRoutingState.RELOCATING), equalTo(1));
assertNotNull(allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6), allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 6, 6, Map.of(ShardRouting.Role.DEFAULT, 6L)), allocationStats.get());

// Test that the AllocationStats are still updated, even though throttling is active. The cluster state should remain unchanged
// because due to throttling: the previous reroute request started relocating two shards and, since those reallocations have not
// been completed, no additional shard relocations can begin.
assertSame(reroutedState, allocationService.reroute(reroutedState, "test", ActionListener.noop()));
assertNotNull(allocationStats);
// Note: total allocations counts relocating and intializing shards, so the two relocating shards will be counted twice.
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4), allocationStats.get());
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, 8, 4, Map.of(ShardRouting.Role.DEFAULT, 4L)), allocationStats.get());
}

public void testDoNotRebalanceToTheNodeThatNoLongerExists() {
Expand Down Expand Up @@ -1293,7 +1293,7 @@ public void testRebalanceDoesNotCauseHotSpots() {

var initializing = shardsWithState(allocation.routingNodes(), ShardRoutingState.INITIALIZING);
if (initializing.isEmpty()) {
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, shardsPerNode * numberOfNodes, 0), allocationStats);
assertEquals(new DesiredBalanceMetrics.AllocationStats(0, shardsPerNode * numberOfNodes, 0, Map.of()), allocationStats);
break;
}

Expand Down