-
Notifications
You must be signed in to change notification settings - Fork 25.6k
allocation: create separate limit for frozen tier concurrent rebalance #135243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 17 commits
1b6837e
7f683ba
56b76c8
763bf97
2ffa4d3
c8810f2
9638803
9d111e9
3478f43
ce8accd
1ffda8e
cb754db
eed71f4
ddf7eee
ecde747
3dde0a6
7db6845
39b9d6b
6681750
03fbb83
05cbcf2
84bc726
c798ad8
3d52ec1
84148e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,13 +20,15 @@ | |
/** | ||
* Similar to the {@link ClusterRebalanceAllocationDecider} this | ||
* {@link AllocationDecider} controls the number of currently in-progress | ||
* re-balance (relocation) operations and restricts node allocations if the | ||
* configured threshold is reached. The default number of concurrent rebalance | ||
* operations is set to {@code 2} | ||
* re-balance (shard relocation) operations and restricts node allocations | ||
* if the configured threshold is reached. Frozen and non-frozen shards are | ||
* considered separately. The default number of concurrent rebalance operations | ||
* is set to {@code 2} for non-frozen shards, and {@code 10} for frozen shards. | ||
* <p> | ||
* Re-balance operations can be controlled in real-time via the cluster update API using | ||
* {@code cluster.routing.allocation.cluster_concurrent_rebalance}. Iff this | ||
* setting is set to {@code -1} the number of concurrent re-balance operations | ||
* {@code cluster.routing.allocation.cluster_concurrent_rebalance} and | ||
* {@code cluster.routing.allocation.cluster_concurrent_frozen_rebalance}. | ||
* Iff either setting is set to {@code -1} the number of concurrent re-balance operations | ||
* are unlimited. | ||
*/ | ||
public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { | ||
|
@@ -44,21 +46,89 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { | |
); | ||
private volatile int clusterConcurrentRebalance; | ||
|
||
/** | ||
* Same as cluster_concurrent_rebalance, but applies separately to frozen tier shards | ||
*/ | ||
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING = Setting.intSetting( | ||
"cluster.routing.allocation.cluster_concurrent_frozen_rebalance", | ||
10, | ||
-1, | ||
Property.Dynamic, | ||
Property.NodeScope | ||
); | ||
private volatile int clusterConcurrentFrozenRebalance; | ||
|
||
public ConcurrentRebalanceAllocationDecider(ClusterSettings clusterSettings) { | ||
clusterSettings.initializeAndWatch( | ||
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, | ||
this::setClusterConcurrentRebalance | ||
); | ||
logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance); | ||
clusterSettings.initializeAndWatch( | ||
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING, | ||
this::setClusterConcurrentFrozenRebalance | ||
); | ||
logger.debug( | ||
"using [cluster_concurrent_rebalance] with [concurrent_rebalance={}, concurrent_frozen_rebalance={}]", | ||
clusterConcurrentRebalance, | ||
clusterConcurrentFrozenRebalance | ||
); | ||
} | ||
|
||
private void setClusterConcurrentRebalance(int concurrentRebalance) { | ||
clusterConcurrentRebalance = concurrentRebalance; | ||
} | ||
|
||
private void setClusterConcurrentFrozenRebalance(int concurrentFrozenRebalance) { | ||
clusterConcurrentFrozenRebalance = concurrentFrozenRebalance; | ||
} | ||
|
||
@Override | ||
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { | ||
return canRebalance(allocation); | ||
int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount(); | ||
nicktindall marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (allocation.routingNodes().isDedicatedFrozenNode(shardRouting.currentNodeId())) { | ||
if (clusterConcurrentFrozenRebalance == -1) { | ||
return allocation.decision(Decision.YES, NAME, "unlimited concurrent frozen rebalances are allowed"); | ||
} | ||
if (relocatingFrozenShards >= clusterConcurrentFrozenRebalance) { | ||
return allocation.decision( | ||
Decision.THROTTLE, | ||
NAME, | ||
"reached the limit of concurrently rebalancing frozen shards [%d], cluster setting [%s=%d]", | ||
relocatingFrozenShards, | ||
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(), | ||
clusterConcurrentFrozenRebalance | ||
); | ||
} | ||
return allocation.decision( | ||
Decision.YES, | ||
NAME, | ||
"below threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", | ||
clusterConcurrentFrozenRebalance, | ||
relocatingFrozenShards | ||
); | ||
} else { | ||
int relocatingShards = allocation.routingNodes().getRelocatingShardCount() - relocatingFrozenShards; | ||
if (clusterConcurrentRebalance == -1) { | ||
return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); | ||
} | ||
if (relocatingShards >= clusterConcurrentRebalance) { | ||
return allocation.decision( | ||
Decision.THROTTLE, | ||
NAME, | ||
"reached the limit of concurrently rebalancing shards [%d], cluster setting [%s=%d]", | ||
relocatingShards, | ||
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), | ||
clusterConcurrentRebalance | ||
); | ||
} | ||
return allocation.decision( | ||
Decision.YES, | ||
NAME, | ||
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", | ||
clusterConcurrentRebalance, | ||
relocatingShards | ||
); | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -68,33 +138,52 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca | |
*/ | ||
@Override | ||
public Decision canRebalance(RoutingAllocation allocation) { | ||
nicktindall marked this conversation as resolved.
Show resolved
Hide resolved
|
||
int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount(); | ||
int relocatingShards = allocation.routingNodes().getRelocatingShardCount(); | ||
if (allocation.isSimulating() && relocatingShards >= 2) { | ||
// BalancedShardAllocator is prone to perform unnecessary moves when cluster_concurrent_rebalance is set to high values (>2). | ||
// (See https://github.com/elastic/elasticsearch/issues/87279) | ||
// Above allocator is used in DesiredBalanceComputer. Since we do not move actual shard data during calculation | ||
// it is possible to artificially set above setting to 2 to avoid unnecessary moves in desired balance. | ||
// Separately: keep overall limit in simulation to two including frozen shards | ||
return allocation.decision(Decision.THROTTLE, NAME, "allocation should move one shard at the time when simulating"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed, this whole branch might be redundant (?) since #134786 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is helpful -- can we leave this to a separate PR? This may not be the only decider that needs to change. |
||
} | ||
if (clusterConcurrentRebalance == -1) { | ||
return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); | ||
} | ||
if (relocatingShards >= clusterConcurrentRebalance) { | ||
|
||
// separate into frozen/non-frozen counts | ||
relocatingShards = relocatingShards - relocatingFrozenShards; | ||
|
||
// either frozen or non-frozen having some allowance before their limit means the allocator has room to rebalance | ||
if (clusterConcurrentRebalance == -1 || relocatingShards < clusterConcurrentRebalance) { | ||
return allocation.decision( | ||
Decision.THROTTLE, | ||
Decision.YES, | ||
NAME, | ||
"reached the limit of concurrently rebalancing shards [%d], cluster setting [%s=%d]", | ||
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", | ||
relocatingShards, | ||
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), | ||
clusterConcurrentRebalance | ||
|
||
); | ||
} | ||
if (clusterConcurrentFrozenRebalance == -1 || relocatingFrozenShards < clusterConcurrentFrozenRebalance) { | ||
return allocation.decision( | ||
Decision.YES, | ||
NAME, | ||
"below threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", | ||
relocatingFrozenShards, | ||
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(), | ||
clusterConcurrentFrozenRebalance | ||
); | ||
nicktindall marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return allocation.decision( | ||
Decision.YES, | ||
Decision.THROTTLE, | ||
NAME, | ||
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", | ||
"reached the limit of concurrently rebalancing shards [%d] for concurrent rebalances, cluster setting [%s=%d], " | ||
+ "and [%d] for concurrent frozen rebalances, frozen cluster setting [%s=%d]", | ||
relocatingShards, | ||
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), | ||
clusterConcurrentRebalance, | ||
relocatingShards | ||
relocatingFrozenShards, | ||
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(), | ||
clusterConcurrentFrozenRebalance | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doc is out of date with the implementation.