Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1b6837e
allocation: create separate limit for frozen tier concurrent rebalance
schase-es Sep 23, 2025
7f683ba
[CI] Update transport version definitions
Sep 23, 2025
56b76c8
[CI] Auto commit changes from spotless
Sep 23, 2025
763bf97
Merge branch 'main' into ES-11303_concurrent-frozen-tier-activity_rebase
schase-es Sep 23, 2025
2ffa4d3
Formatting fixes
schase-es Sep 23, 2025
c8810f2
Adding unlimited test
schase-es Sep 23, 2025
9638803
Addressed or vs. and logic in the context of the allocation algorithm
schase-es Sep 24, 2025
9d111e9
[CI] Auto commit changes from spotless
Sep 24, 2025
3478f43
- Fixed up cluster-wide algorithm for applying frozen tier limits
schase-es Sep 24, 2025
ce8accd
Adding separate test for frozen
schase-es Sep 24, 2025
1ffda8e
[CI] Auto commit changes from spotless
Sep 24, 2025
cb754db
Check RoutingNode.node() for null in frozenNode test
schase-es Sep 24, 2025
eed71f4
Merge branch 'main' into ES-11303_concurrent-frozen-tier-activity_rebase
schase-es Sep 24, 2025
ddf7eee
Addressed review/extra feedback
schase-es Sep 25, 2025
ecde747
Merge branch 'main' into ES-11303_concurrent-frozen-tier-activity_rebase
schase-es Sep 25, 2025
3dde0a6
Adding in mixed frozen and hot test
schase-es Sep 26, 2025
7db6845
Merge branch 'main' into ES-11303_concurrent-frozen-tier-activity_rebase
schase-es Sep 27, 2025
39b9d6b
Merge branch 'main' into ES-11303_concurrent-frozen-tier-activity_rebase
schase-es Oct 8, 2025
6681750
Test fixes
schase-es Oct 8, 2025
03fbb83
Set default concurrent rebalance for frozen to 2
schase-es Oct 9, 2025
05cbcf2
Default to normal setting
schase-es Oct 9, 2025
84bc726
Renaming strategy to allocationService
schase-es Oct 13, 2025
c798ad8
Fixed up decision message formatting, and added test
schase-es Oct 13, 2025
3d52ec1
Merge branch 'main' into ES-11303_concurrent-frozen-tier-activity_rebase
schase-es Oct 13, 2025
84148e7
Slight improvements to the test structure
schase-es Oct 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ public Stream<ShardRouting> shardsWithState(ShardRoutingState state) {
return internalGetShardsWithState(state).stream();
}

/**
* Determine the number of shards with a specific state
* @param state ShardRoutingState which should be listed
* @return number of shards
*/
public int shardCountsWithState(ShardRoutingState state) {
return internalGetShardsWithState(state).size();
}

/**
* Determine the shards of an index with a specific state
* @param index id of the index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {

private int relocatingShards = 0;

private int relocatingFrozenShards = 0;

private final Map<String, Set<String>> attributeValuesByAttribute;
private final Map<String, Recoveries> recoveriesPerNode;

Expand Down Expand Up @@ -152,6 +154,9 @@ private RoutingNodes(GlobalRoutingTable routingTable, DiscoveryNodes discoveryNo
assignedShardsAdd(shard);
if (shard.relocating()) {
relocatingShards++;
if (isFrozenNode(shard.currentNodeId())) {
relocatingFrozenShards++;
}
ShardRouting targetShardRouting = shard.getTargetRelocatingShard();
addInitialRecovery(targetShardRouting, indexShard.primary);
// LinkedHashMap to preserve order.
Expand Down Expand Up @@ -192,6 +197,7 @@ private RoutingNodes(RoutingNodes routingNodes) {
this.inactivePrimaryCount = routingNodes.inactivePrimaryCount;
this.inactiveShardCount = routingNodes.inactiveShardCount;
this.relocatingShards = routingNodes.relocatingShards;
this.relocatingFrozenShards = routingNodes.relocatingFrozenShards;
this.attributeValuesByAttribute = Collections.synchronizedMap(Maps.copyOf(routingNodes.attributeValuesByAttribute, HashSet::new));
this.recoveriesPerNode = Maps.copyOf(routingNodes.recoveriesPerNode, Recoveries::copy);
}
Expand Down Expand Up @@ -343,6 +349,18 @@ public int getRelocatingShardCount() {
return relocatingShards;
}

private boolean isFrozenNode(String nodeId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Perhaps we should be explicit that we're looking for "dedicated" frozen nodes in the method name?

RoutingNode node = nodesToShards.get(nodeId);
if (node != null && node.node() != null && node.node().isDedicatedFrozenNode()) {
return true;
}
return false;
}

public int getRelocatingFrozenShardCount() {
return relocatingFrozenShards;
}

/**
* Returns all shards that are not in the state UNASSIGNED with the same shard
* ID as the given shard.
Expand Down Expand Up @@ -478,6 +496,9 @@ public Tuple<ShardRouting, ShardRouting> relocateShard(
) {
ensureMutable();
relocatingShards++;
if (isFrozenNode(nodeId)) {
relocatingFrozenShards++;
}
ShardRouting source = startedShard.relocate(nodeId, expectedShardSize);
ShardRouting target = source.getTargetRelocatingShard();
updateAssigned(startedShard, source);
Expand Down Expand Up @@ -726,6 +747,9 @@ private ShardRouting started(ShardRouting shard, long expectedShardSize) {
*/
private ShardRouting cancelRelocation(ShardRouting shard) {
relocatingShards--;
if (isFrozenNode(shard.currentNodeId())) {
relocatingFrozenShards--;
}
ShardRouting cancelledShard = shard.cancelRelocation();
updateAssigned(shard, cancelledShard);
return cancelledShard;
Expand Down Expand Up @@ -881,6 +905,7 @@ public boolean equals(Object o) {
&& inactivePrimaryCount == that.inactivePrimaryCount
&& inactiveShardCount == that.inactiveShardCount
&& relocatingShards == that.relocatingShards
&& relocatingFrozenShards == that.relocatingFrozenShards
&& nodesToShards.equals(that.nodesToShards)
&& unassignedShards.equals(that.unassignedShards)
&& assignedShards.equals(that.assignedShards)
Expand All @@ -898,6 +923,7 @@ public int hashCode() {
inactivePrimaryCount,
inactiveShardCount,
relocatingShards,
relocatingFrozenShards,
attributeValuesByAttribute,
recoveriesPerNode
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
/**
* Similar to the {@link ClusterRebalanceAllocationDecider} this
* {@link AllocationDecider} controls the number of currently in-progress
* re-balance (relocation) operations and restricts node allocations if the
* configured threshold is reached. The default number of concurrent rebalance
* operations is set to {@code 2}
* re-balance (shard relocation) operations and restricts node allocations
* if the configured threshold is reached. Frozen and non-frozen shards are
* considered separately. The default number of concurrent rebalance operations
* is set to {@code 2} for non-frozen shards, and {@code 10} for frozen shards.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc is out of date with the implementation.

* <p>
* Re-balance operations can be controlled in real-time via the cluster update API using
* {@code cluster.routing.allocation.cluster_concurrent_rebalance}. Iff this
* setting is set to {@code -1} the number of concurrent re-balance operations
* {@code cluster.routing.allocation.cluster_concurrent_rebalance} and
* {@code cluster.routing.allocation.cluster_concurrent_frozen_rebalance}.
* Iff either setting is set to {@code -1} the number of concurrent re-balance operations
* are unlimited.
*/
public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
Expand All @@ -44,21 +46,89 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
);
private volatile int clusterConcurrentRebalance;

/**
* Same as cluster_concurrent_rebalance, but applies separately to frozen tier shards
*/
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING = Setting.intSetting(
"cluster.routing.allocation.cluster_concurrent_frozen_rebalance",
10,
-1,
Property.Dynamic,
Property.NodeScope
);
private volatile int clusterConcurrentFrozenRebalance;

public ConcurrentRebalanceAllocationDecider(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
this::setClusterConcurrentRebalance
);
logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance);
clusterSettings.initializeAndWatch(
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING,
this::setClusterConcurrentFrozenRebalance
);
logger.debug(
"using [cluster_concurrent_rebalance] with [concurrent_rebalance={}, concurrent_frozen_rebalance={}]",
clusterConcurrentRebalance,
clusterConcurrentFrozenRebalance
);
}

private void setClusterConcurrentRebalance(int concurrentRebalance) {
clusterConcurrentRebalance = concurrentRebalance;
}

private void setClusterConcurrentFrozenRebalance(int concurrentFrozenRebalance) {
clusterConcurrentFrozenRebalance = concurrentFrozenRebalance;
}

@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
return canRebalance(allocation);
int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount();
if (isFrozenShard(allocation, shardRouting)) {
if (clusterConcurrentFrozenRebalance == -1) {
return allocation.decision(Decision.YES, NAME, "unlimited concurrent frozen rebalances are allowed");
}
if (relocatingFrozenShards >= clusterConcurrentFrozenRebalance) {
return allocation.decision(
Decision.THROTTLE,
NAME,
"reached the limit of concurrently rebalancing frozen shards [%d], cluster setting [%s=%d]",
relocatingFrozenShards,
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(),
clusterConcurrentFrozenRebalance
);
}
return allocation.decision(
Decision.YES,
NAME,
"below threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]",
clusterConcurrentFrozenRebalance,
relocatingFrozenShards
);
} else {
int relocatingShards = allocation.routingNodes().getRelocatingShardCount() - relocatingFrozenShards;
if (clusterConcurrentRebalance == -1) {
return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed");
}
if (relocatingShards >= clusterConcurrentRebalance) {
return allocation.decision(
Decision.THROTTLE,
NAME,
"reached the limit of concurrently rebalancing shards [%d], cluster setting [%s=%d]",
relocatingShards,
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(),
clusterConcurrentRebalance
);
}
return allocation.decision(
Decision.YES,
NAME,
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]",
clusterConcurrentRebalance,
relocatingShards
);
}
}

/**
Expand All @@ -68,33 +138,58 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
*/
@Override
public Decision canRebalance(RoutingAllocation allocation) {
int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount();
int relocatingShards = allocation.routingNodes().getRelocatingShardCount();
if (allocation.isSimulating() && relocatingShards >= 2) {
// BalancedShardAllocator is prone to perform unnecessary moves when cluster_concurrent_rebalance is set to high values (>2).
// (See https://github.com/elastic/elasticsearch/issues/87279)
// Above allocator is used in DesiredBalanceComputer. Since we do not move actual shard data during calculation
// it is possible to artificially set above setting to 2 to avoid unnecessary moves in desired balance.
// Separately: keep overall limit in simulation to two including frozen shards
return allocation.decision(Decision.THROTTLE, NAME, "allocation should move one shard at the time when simulating");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, this whole branch might be redundant (?) since #134786

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is helpful -- can we leave this to a separate PR? This may not be the only decider that needs to change.

}
if (clusterConcurrentRebalance == -1) {
return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed");
}
if (relocatingShards >= clusterConcurrentRebalance) {

// separate into frozen/non-frozen counts
relocatingShards = relocatingShards - relocatingFrozenShards;

// either frozen or non-frozen having some allowance before their limit means the allocator has room to rebalance
if (clusterConcurrentRebalance == -1 || relocatingShards < clusterConcurrentRebalance) {
return allocation.decision(
Decision.THROTTLE,
Decision.YES,
NAME,
"reached the limit of concurrently rebalancing shards [%d], cluster setting [%s=%d]",
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]",
relocatingShards,
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(),
clusterConcurrentRebalance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the string template doesn't match the parameters now? we used to have three placeholders, now we only have two. I wonder if we should add a unit test for the decider, there doesn't seem to be one already, but I have seen them implemented for other deciders, it would make it easy to test all these branches. See org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDeciderTests#testWriteLoadDeciderCanAllocate for example?

);
}
if (clusterConcurrentFrozenRebalance == -1 || relocatingFrozenShards < clusterConcurrentFrozenRebalance) {
return allocation.decision(
Decision.YES,
NAME,
"below threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]",
relocatingFrozenShards,
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(),
clusterConcurrentFrozenRebalance
);
}
return allocation.decision(
Decision.YES,
Decision.THROTTLE,
NAME,
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]",
"above threshold [%d] for concurrent rebalances, current rebalance shard count [%d], "
+ "and threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]",
clusterConcurrentRebalance,
relocatingShards
relocatingShards,
clusterConcurrentFrozenRebalance,
relocatingFrozenShards
);
}

private boolean isFrozenShard(RoutingAllocation allocation, ShardRouting shard) {
String nodeId = shard.currentNodeId();
if (nodeId != null && allocation.routingNodes().node(nodeId).node().isDedicatedFrozenNode()) {
return true;
}
return false;
}
Copy link
Contributor

@nicktindall nicktindall Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic for determining what constitutes a frozen shard we should check with wiser heads. We are considering only shards moving off of a dedicated frozen node as being "frozen" shards.

I think this is an acceptable approximation because:

  • We recommend self managed deployments use dedicated frozen nodes and we only have dedicated frozen nodes in ECH (I believe), so "dedicated frozen node" is equivalent to just "frozen node" if things are configured as we recommend. I think it's OK to not allow more rebalancing in the event someone has gone outside our recommendations and created non-dedicated frozen nodes.
    • If I recall correctly, part of the justification for the change was that the frozen and non-frozen nodes are mutually exclusive
    • There is an edge case where we'd count shards relocating from dedicated to non-dedicated frozen nodes, but not vice versa, but again that seems outside of our recommendations and not worth going to great effort to remedy? if we want to we be strict about that we could also check the relocatingNodeId for dedicated frozen-nes?.
  • To determine whether a shard is a frozen shard we need to look at the index metadata, which we could do in canRebalance(shardRouting, allocation) via the cluster state and checking IndexMetadata#getTierPreference(), but it looks like knowledge of specific data tiers lives in x-pack as it stands, and I wonder if we want that to leak out. It looks like tier preference is largely opaque to the rest of the code base, I may have misinterpreted that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to check with someone else, too.

I wrote the frozen/non-frozen counts, so that frozen is a subset count of all relocations. If a relocation isn't counted as frozen, then it's counted as non-frozen. So if something is mistaken, it's just subjected to a tighter constraint.

I can add a flag to ShardRouting, but as we discussed (and you're reminding me here) it may be too far for the data tier concept to propagate.

}
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public void apply(Settings value, Settings current, Settings previous) {
BreakerSettings.CIRCUIT_BREAKER_TYPE,
ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING,
ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING,
Expand Down
Loading