Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1b6837e
allocation: create separate limit for frozen tier concurrent rebalance
schase-es Sep 23, 2025
7f683ba
[CI] Update transport version definitions
Sep 23, 2025
56b76c8
[CI] Auto commit changes from spotless
Sep 23, 2025
763bf97
Merge branch 'main' into ES-11303_concurrent-frozen-tier-activity_rebase
schase-es Sep 23, 2025
2ffa4d3
Formatting fixes
schase-es Sep 23, 2025
c8810f2
Adding unlimited test
schase-es Sep 23, 2025
9638803
Addressed or vs. and logic in the context of the allocation algorithm
schase-es Sep 24, 2025
9d111e9
[CI] Auto commit changes from spotless
Sep 24, 2025
3478f43
- Fixed up cluster-wide algorithm for applying frozen tier limits
schase-es Sep 24, 2025
ce8accd
Adding separate test for frozen
schase-es Sep 24, 2025
1ffda8e
[CI] Auto commit changes from spotless
Sep 24, 2025
cb754db
Check RoutingNode.node() for null in frozenNode test
schase-es Sep 24, 2025
eed71f4
Merge branch 'main' into ES-11303_concurrent-frozen-tier-activity_rebase
schase-es Sep 24, 2025
ddf7eee
Addressed review/extra feedback
schase-es Sep 25, 2025
ecde747
Merge branch 'main' into ES-11303_concurrent-frozen-tier-activity_rebase
schase-es Sep 25, 2025
3dde0a6
Adding in mixed frozen and hot test
schase-es Sep 26, 2025
7db6845
Merge branch 'main' into ES-11303_concurrent-frozen-tier-activity_rebase
schase-es Sep 27, 2025
39b9d6b
Merge branch 'main' into ES-11303_concurrent-frozen-tier-activity_rebase
schase-es Oct 8, 2025
6681750
Test fixes
schase-es Oct 8, 2025
03fbb83
Set default concurrent rebalance for frozen to 2
schase-es Oct 9, 2025
05cbcf2
Default to normal setting
schase-es Oct 9, 2025
84bc726
Renaming strategy to allocationService
schase-es Oct 13, 2025
c798ad8
Fixed up decision message formatting, and added test
schase-es Oct 13, 2025
3d52ec1
Merge branch 'main' into ES-11303_concurrent-frozen-tier-activity_rebase
schase-es Oct 13, 2025
84148e7
Slight improvements to the test structure
schase-es Oct 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {

private int relocatingShards = 0;

private int relocatingFrozenShards = 0;

private final Map<String, Set<String>> attributeValuesByAttribute;
private final Map<String, Recoveries> recoveriesPerNode;

Expand Down Expand Up @@ -152,6 +154,9 @@ private RoutingNodes(GlobalRoutingTable routingTable, DiscoveryNodes discoveryNo
assignedShardsAdd(shard);
if (shard.relocating()) {
relocatingShards++;
if (isDedicatedFrozenNode(shard.currentNodeId())) {
relocatingFrozenShards++;
}
ShardRouting targetShardRouting = shard.getTargetRelocatingShard();
addInitialRecovery(targetShardRouting, indexShard.primary);
// LinkedHashMap to preserve order.
Expand Down Expand Up @@ -192,6 +197,7 @@ private RoutingNodes(RoutingNodes routingNodes) {
this.inactivePrimaryCount = routingNodes.inactivePrimaryCount;
this.inactiveShardCount = routingNodes.inactiveShardCount;
this.relocatingShards = routingNodes.relocatingShards;
this.relocatingFrozenShards = routingNodes.relocatingFrozenShards;
this.attributeValuesByAttribute = Collections.synchronizedMap(Maps.copyOf(routingNodes.attributeValuesByAttribute, HashSet::new));
this.recoveriesPerNode = Maps.copyOf(routingNodes.recoveriesPerNode, Recoveries::copy);
}
Expand Down Expand Up @@ -343,6 +349,18 @@ public int getRelocatingShardCount() {
return relocatingShards;
}

public boolean isDedicatedFrozenNode(String nodeId) {
RoutingNode node = nodesToShards.get(nodeId);
if (node != null && node.node() != null && node.node().isDedicatedFrozenNode()) {
return true;
}
return false;
}

public int getRelocatingFrozenShardCount() {
return relocatingFrozenShards;
}

/**
* Returns all shards that are not in the state UNASSIGNED with the same shard
* ID as the given shard.
Expand Down Expand Up @@ -478,6 +496,9 @@ public Tuple<ShardRouting, ShardRouting> relocateShard(
) {
ensureMutable();
relocatingShards++;
if (isDedicatedFrozenNode(nodeId)) {
relocatingFrozenShards++;
}
ShardRouting source = startedShard.relocate(nodeId, expectedShardSize);
ShardRouting target = source.getTargetRelocatingShard();
updateAssigned(startedShard, source);
Expand Down Expand Up @@ -726,6 +747,9 @@ private ShardRouting started(ShardRouting shard, long expectedShardSize) {
*/
private ShardRouting cancelRelocation(ShardRouting shard) {
relocatingShards--;
if (isDedicatedFrozenNode(shard.currentNodeId())) {
relocatingFrozenShards--;
}
ShardRouting cancelledShard = shard.cancelRelocation();
updateAssigned(shard, cancelledShard);
return cancelledShard;
Expand Down Expand Up @@ -881,6 +905,7 @@ public boolean equals(Object o) {
&& inactivePrimaryCount == that.inactivePrimaryCount
&& inactiveShardCount == that.inactiveShardCount
&& relocatingShards == that.relocatingShards
&& relocatingFrozenShards == that.relocatingFrozenShards
&& nodesToShards.equals(that.nodesToShards)
&& unassignedShards.equals(that.unassignedShards)
&& assignedShards.equals(that.assignedShards)
Expand All @@ -898,6 +923,7 @@ public int hashCode() {
inactivePrimaryCount,
inactiveShardCount,
relocatingShards,
relocatingFrozenShards,
attributeValuesByAttribute,
recoveriesPerNode
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
/**
* Similar to the {@link ClusterRebalanceAllocationDecider} this
* {@link AllocationDecider} controls the number of currently in-progress
* re-balance (relocation) operations and restricts node allocations if the
* configured threshold is reached. The default number of concurrent rebalance
* operations is set to {@code 2}
* re-balance (shard relocation) operations and restricts node allocations
* if the configured threshold is reached. Frozen and non-frozen shards are
* considered separately. The default number of concurrent rebalance operations
* is set to {@code 2} for non-frozen shards, and {@code 10} for frozen shards.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc is out of date with the implementation.

* <p>
* Re-balance operations can be controlled in real-time via the cluster update API using
* {@code cluster.routing.allocation.cluster_concurrent_rebalance}. Iff this
* setting is set to {@code -1} the number of concurrent re-balance operations
* {@code cluster.routing.allocation.cluster_concurrent_rebalance} and
* {@code cluster.routing.allocation.cluster_concurrent_frozen_rebalance}.
* Iff either setting is set to {@code -1} the number of concurrent re-balance operations
* are unlimited.
*/
public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
Expand All @@ -44,21 +46,91 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
);
private volatile int clusterConcurrentRebalance;

/**
* Same as cluster_concurrent_rebalance, but applies separately to frozen tier shards
*
* Defaults to the same value as normal concurrent rebalance, if unspecified
*/
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING = Setting.intSetting(
"cluster.routing.allocation.cluster_concurrent_frozen_rebalance",
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
-1,
Property.Dynamic,
Property.NodeScope
);
private volatile int clusterConcurrentFrozenRebalance;

public ConcurrentRebalanceAllocationDecider(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
this::setClusterConcurrentRebalance
);
logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance);
clusterSettings.initializeAndWatch(
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING,
this::setClusterConcurrentFrozenRebalance
);
logger.debug(
"using [cluster_concurrent_rebalance] with [concurrent_rebalance={}, concurrent_frozen_rebalance={}]",
clusterConcurrentRebalance,
clusterConcurrentFrozenRebalance
);
}

private void setClusterConcurrentRebalance(int concurrentRebalance) {
clusterConcurrentRebalance = concurrentRebalance;
}

private void setClusterConcurrentFrozenRebalance(int concurrentFrozenRebalance) {
clusterConcurrentFrozenRebalance = concurrentFrozenRebalance;
}

@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
return canRebalance(allocation);
int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount();
if (allocation.routingNodes().isDedicatedFrozenNode(shardRouting.currentNodeId())) {
if (clusterConcurrentFrozenRebalance == -1) {
return allocation.decision(Decision.YES, NAME, "unlimited concurrent frozen rebalances are allowed");
}
if (relocatingFrozenShards >= clusterConcurrentFrozenRebalance) {
return allocation.decision(
Decision.THROTTLE,
NAME,
"reached the limit of concurrently rebalancing frozen shards [%d], cluster setting [%s=%d]",
relocatingFrozenShards,
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(),
clusterConcurrentFrozenRebalance
);
}
return allocation.decision(
Decision.YES,
NAME,
"below threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]",
clusterConcurrentFrozenRebalance,
relocatingFrozenShards
);
} else {
int relocatingShards = allocation.routingNodes().getRelocatingShardCount() - relocatingFrozenShards;
if (clusterConcurrentRebalance == -1) {
return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed");
}
if (relocatingShards >= clusterConcurrentRebalance) {
return allocation.decision(
Decision.THROTTLE,
NAME,
"reached the limit of concurrently rebalancing shards [%d], cluster setting [%s=%d]",
relocatingShards,
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(),
clusterConcurrentRebalance
);
}
return allocation.decision(
Decision.YES,
NAME,
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]",
clusterConcurrentRebalance,
relocatingShards
);
}
}

/**
Expand All @@ -68,33 +140,52 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
*/
@Override
public Decision canRebalance(RoutingAllocation allocation) {
int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount();
int relocatingShards = allocation.routingNodes().getRelocatingShardCount();
if (allocation.isSimulating() && relocatingShards >= 2) {
// BalancedShardAllocator is prone to perform unnecessary moves when cluster_concurrent_rebalance is set to high values (>2).
// (See https://github.com/elastic/elasticsearch/issues/87279)
// Above allocator is used in DesiredBalanceComputer. Since we do not move actual shard data during calculation
// it is possible to artificially set above setting to 2 to avoid unnecessary moves in desired balance.
// Separately: keep overall limit in simulation to two including frozen shards
return allocation.decision(Decision.THROTTLE, NAME, "allocation should move one shard at the time when simulating");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, this whole branch might be redundant (?) since #134786

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is helpful -- can we leave this to a separate PR? This may not be the only decider that needs to change.

}
if (clusterConcurrentRebalance == -1) {
return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed");
}
if (relocatingShards >= clusterConcurrentRebalance) {

// separate into frozen/non-frozen counts
relocatingShards = relocatingShards - relocatingFrozenShards;

// either frozen or non-frozen having some allowance before their limit means the allocator has room to rebalance
if (clusterConcurrentRebalance == -1 || relocatingShards < clusterConcurrentRebalance) {
return allocation.decision(
Decision.THROTTLE,
Decision.YES,
NAME,
"reached the limit of concurrently rebalancing shards [%d], cluster setting [%s=%d]",
relocatingShards,
"below threshold [%s=%d] for concurrent rebalances, current rebalance shard count [%d]",
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(),
clusterConcurrentRebalance
clusterConcurrentRebalance,
relocatingShards
);
}
if (clusterConcurrentFrozenRebalance == -1 || relocatingFrozenShards < clusterConcurrentFrozenRebalance) {
return allocation.decision(
Decision.YES,
NAME,
"below threshold [%s=%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]",
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(),
clusterConcurrentFrozenRebalance,
relocatingFrozenShards
);
}
return allocation.decision(
Decision.YES,
Decision.THROTTLE,
NAME,
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]",
"reached the limit of concurrently rebalancing shards [%d] for concurrent rebalances, cluster setting [%s=%d], "
+ "and [%d] for concurrent frozen rebalances, frozen cluster setting [%s=%d]",
relocatingShards,
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(),
clusterConcurrentRebalance,
relocatingShards
relocatingFrozenShards,
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(),
clusterConcurrentFrozenRebalance
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public void apply(Settings value, Settings current, Settings previous) {
BreakerSettings.CIRCUIT_BREAKER_TYPE,
ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING,
ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING,
FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING,
Expand Down
Loading