-
Notifications
You must be signed in to change notification settings - Fork 25.5k
allocation: create separate limit for frozen tier concurrent rebalance #135243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
1b6837e
7f683ba
56b76c8
763bf97
2ffa4d3
c8810f2
9638803
9d111e9
3478f43
ce8accd
1ffda8e
cb754db
eed71f4
ddf7eee
ecde747
3dde0a6
7db6845
39b9d6b
6681750
03fbb83
05cbcf2
84bc726
c798ad8
3d52ec1
84148e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.elasticsearch.cluster.routing.ShardRouting; | ||
import org.elasticsearch.cluster.routing.ShardRoutingState; | ||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; | ||
import org.elasticsearch.common.settings.ClusterSettings; | ||
import org.elasticsearch.common.settings.Setting; | ||
|
@@ -20,13 +21,15 @@ | |
/** | ||
* Similar to the {@link ClusterRebalanceAllocationDecider} this | ||
* {@link AllocationDecider} controls the number of currently in-progress | ||
* re-balance (relocation) operations and restricts node allocations if the | ||
* configured threshold is reached. The default number of concurrent rebalance | ||
* operations is set to {@code 2} | ||
* re-balance (shard relocation) operations and restricts node allocations | ||
* if the configured threshold is reached. Frozen and non-frozen shards are | ||
* considered separately. The default number of concurrent rebalance operations | ||
* is set to {@code 2} for non-frozen shards, and {@code 10} for frozen shards. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doc is out of date with the implementation. |
||
* <p> | ||
* Re-balance operations can be controlled in real-time via the cluster update API using | ||
* {@code cluster.routing.allocation.cluster_concurrent_rebalance}. Iff this | ||
* setting is set to {@code -1} the number of concurrent re-balance operations | ||
* {@code cluster.routing.allocation.cluster_concurrent_rebalance} and | ||
* {@code cluster.routing.allocation.cluster_concurrent_frozen_rebalance}. | ||
* Iff either setting is set to {@code -1} the number of concurrent re-balance operations | ||
* are unlimited. | ||
*/ | ||
public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { | ||
|
@@ -44,18 +47,42 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { | |
); | ||
private volatile int clusterConcurrentRebalance; | ||
|
||
/** | ||
* Same as cluster_concurrent_rebalance, but applies separately to frozen tier shards | ||
*/ | ||
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING = Setting.intSetting( | ||
"cluster.routing.allocation.cluster_concurrent_frozen_rebalance", | ||
10, | ||
-1, | ||
Property.Dynamic, | ||
Property.NodeScope | ||
); | ||
private volatile int clusterConcurrentFrozenRebalance; | ||
|
||
public ConcurrentRebalanceAllocationDecider(ClusterSettings clusterSettings) { | ||
clusterSettings.initializeAndWatch( | ||
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, | ||
this::setClusterConcurrentRebalance | ||
); | ||
logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance); | ||
clusterSettings.initializeAndWatch( | ||
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING, | ||
this::setClusterConcurrentFrozenRebalance | ||
); | ||
logger.debug( | ||
"using [cluster_concurrent_rebalance] with [concurrent_rebalance={}, concurrent_frozen_rebalance={}]", | ||
clusterConcurrentRebalance, | ||
clusterConcurrentFrozenRebalance | ||
); | ||
} | ||
|
||
private void setClusterConcurrentRebalance(int concurrentRebalance) { | ||
clusterConcurrentRebalance = concurrentRebalance; | ||
} | ||
|
||
private void setClusterConcurrentFrozenRebalance(int concurrentFrozenRebalance) { | ||
clusterConcurrentFrozenRebalance = concurrentFrozenRebalance; | ||
} | ||
|
||
@Override | ||
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { | ||
return canRebalance(allocation); | ||
|
@@ -68,18 +95,25 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca | |
*/ | ||
@Override | ||
public Decision canRebalance(RoutingAllocation allocation) { | ||
nicktindall marked this conversation as resolved.
Show resolved
Hide resolved
|
||
int relocatingFrozenShards = countRelocatingFrozenShards(allocation); | ||
int relocatingShards = allocation.routingNodes().getRelocatingShardCount(); | ||
if (allocation.isSimulating() && relocatingShards >= 2) { | ||
// BalancedShardAllocator is prone to perform unnecessary moves when cluster_concurrent_rebalance is set to high values (>2). | ||
// (See https://github.com/elastic/elasticsearch/issues/87279) | ||
// Above allocator is used in DesiredBalanceComputer. Since we do not move actual shard data during calculation | ||
// it is possible to artificially set above setting to 2 to avoid unnecessary moves in desired balance. | ||
// Separately: keep overall limit in simulation to two including frozen shards | ||
return allocation.decision(Decision.THROTTLE, NAME, "allocation should move one shard at the time when simulating"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed, this whole branch might be redundant (?) since #134786 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is helpful -- can we leave this to a separate PR? This may not be the only decider that needs to change. |
||
} | ||
if (clusterConcurrentRebalance == -1) { | ||
boolean normalRelocationUnlimited = clusterConcurrentRebalance == -1; | ||
boolean frozenRelocationUnlimited = clusterConcurrentFrozenRebalance == -1; | ||
if (normalRelocationUnlimited && frozenRelocationUnlimited) { | ||
return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); | ||
} | ||
if (relocatingShards >= clusterConcurrentRebalance) { | ||
|
||
// separate into frozen/non-frozen counts | ||
relocatingShards = relocatingShards - relocatingFrozenShards; | ||
if (normalRelocationUnlimited == false && relocatingShards >= clusterConcurrentRebalance) { | ||
return allocation.decision( | ||
Decision.THROTTLE, | ||
NAME, | ||
|
@@ -89,12 +123,35 @@ public Decision canRebalance(RoutingAllocation allocation) { | |
clusterConcurrentRebalance | ||
|
||
); | ||
} | ||
if (frozenRelocationUnlimited == false && relocatingFrozenShards >= clusterConcurrentFrozenRebalance) { | ||
return allocation.decision( | ||
Decision.THROTTLE, | ||
NAME, | ||
"reached the limit of concurrently rebalancing frozen shards [%d], cluster setting [%s=%d]", | ||
relocatingFrozenShards, | ||
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(), | ||
clusterConcurrentFrozenRebalance | ||
); | ||
nicktindall marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return allocation.decision( | ||
Decision.YES, | ||
NAME, | ||
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", | ||
"below threshold [%d] for concurrent rebalances, current rebalance shard count [%d], " | ||
+ "and threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", | ||
clusterConcurrentRebalance, | ||
relocatingShards | ||
relocatingShards, | ||
clusterConcurrentFrozenRebalance, | ||
relocatingFrozenShards | ||
); | ||
} | ||
|
||
private int countRelocatingFrozenShards(RoutingAllocation allocation) { | ||
int frozenRelocations = 0; | ||
for (var routingNode : allocation.routingNodes()) { | ||
if (routingNode.node().isDedicatedFrozenNode()) { | ||
frozenRelocations += routingNode.shardCountsWithState(ShardRoutingState.RELOCATING); | ||
} | ||
} | ||
return frozenRelocations; | ||
} | ||
nicktindall marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,11 +16,14 @@ | |
import org.elasticsearch.cluster.TestShardRoutingRoleStrategies; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.cluster.metadata.Metadata; | ||
import org.elasticsearch.cluster.node.DiscoveryNodeRole; | ||
import org.elasticsearch.cluster.node.DiscoveryNodes; | ||
import org.elasticsearch.cluster.routing.RoutingTable; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.index.IndexVersion; | ||
|
||
import java.util.Collections; | ||
|
||
import static org.elasticsearch.cluster.routing.RoutingNodesHelper.shardsWithState; | ||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; | ||
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; | ||
|
@@ -32,18 +35,41 @@ | |
public class ConcurrentRebalanceRoutingTests extends ESAllocationTestCase { | ||
|
||
public void testClusterConcurrentRebalance() { | ||
AllocationService strategy = createAllocationService( | ||
Settings.builder() | ||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) | ||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) | ||
.build() | ||
); | ||
boolean testFrozen = randomBoolean(); | ||
|
||
AllocationService strategy; | ||
if (testFrozen) { | ||
strategy = createAllocationService( | ||
Settings.builder() | ||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) | ||
.put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 3) | ||
.build() | ||
); | ||
} else { | ||
strategy = createAllocationService( | ||
Settings.builder() | ||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) | ||
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) | ||
.build() | ||
); | ||
} | ||
|
||
logger.info("Building initial routing table"); | ||
|
||
Metadata metadata = Metadata.builder() | ||
.put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(5).numberOfReplicas(1)) | ||
.build(); | ||
Metadata metadata; | ||
if (testFrozen) { | ||
metadata = Metadata.builder() | ||
.put( | ||
IndexMetadata.builder("test") | ||
.settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN)) | ||
.numberOfShards(5) | ||
.numberOfReplicas(1) | ||
) | ||
.build(); | ||
} else { | ||
metadata = Metadata.builder() | ||
.put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(5).numberOfReplicas(1)) | ||
.build(); | ||
} | ||
|
||
RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) | ||
.addAsNew(metadata.getProject().index("test")) | ||
|
@@ -61,9 +87,19 @@ public void testClusterConcurrentRebalance() { | |
} | ||
|
||
logger.info("start two nodes and fully start the shards"); | ||
clusterState = ClusterState.builder(clusterState) | ||
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) | ||
.build(); | ||
if (testFrozen) { | ||
clusterState = ClusterState.builder(clusterState) | ||
.nodes( | ||
DiscoveryNodes.builder() | ||
.add(newNode("node1", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node2", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
) | ||
.build(); | ||
} else { | ||
clusterState = ClusterState.builder(clusterState) | ||
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) | ||
.build(); | ||
} | ||
clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); | ||
|
||
for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { | ||
|
@@ -82,19 +118,35 @@ public void testClusterConcurrentRebalance() { | |
} | ||
|
||
logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened"); | ||
clusterState = ClusterState.builder(clusterState) | ||
.nodes( | ||
DiscoveryNodes.builder(clusterState.nodes()) | ||
.add(newNode("node3")) | ||
.add(newNode("node4")) | ||
.add(newNode("node5")) | ||
.add(newNode("node6")) | ||
.add(newNode("node7")) | ||
.add(newNode("node8")) | ||
.add(newNode("node9")) | ||
.add(newNode("node10")) | ||
) | ||
.build(); | ||
if (testFrozen) { | ||
clusterState = ClusterState.builder(clusterState) | ||
.nodes( | ||
DiscoveryNodes.builder(clusterState.nodes()) | ||
.add(newNode("node3", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node4", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node5", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node6", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node7", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node8", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node9", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node10", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
) | ||
.build(); | ||
} else { | ||
clusterState = ClusterState.builder(clusterState) | ||
.nodes( | ||
DiscoveryNodes.builder(clusterState.nodes()) | ||
.add(newNode("node3")) | ||
.add(newNode("node4")) | ||
.add(newNode("node5")) | ||
.add(newNode("node6")) | ||
.add(newNode("node7")) | ||
.add(newNode("node8")) | ||
.add(newNode("node9")) | ||
.add(newNode("node10")) | ||
) | ||
.build(); | ||
} | ||
clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); | ||
|
||
for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { | ||
|
@@ -131,4 +183,76 @@ public void testClusterConcurrentRebalance() { | |
assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(10)); | ||
assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), equalTo(0)); | ||
} | ||
|
||
public void testClusterConcurrentRebalanceFrozenUnlimited() { | ||
AllocationService strategy = createAllocationService( | ||
Settings.builder() | ||
.put("cluster.routing.allocation.node_concurrent_recoveries", 10) | ||
.put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", -1) | ||
.build() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be better to set non-frozen recoveries to something very restrictive (like 1) so it's clear those limits are not in play here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nudge on on this one, could set it to zero maybe to disable non-frozen movement altogether? |
||
); | ||
|
||
logger.info("Building initial routing table"); | ||
|
||
Metadata metadata = Metadata.builder() | ||
.put( | ||
IndexMetadata.builder("test") | ||
.settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN)) | ||
.numberOfShards(5) | ||
.numberOfReplicas(1) | ||
) | ||
.build(); | ||
|
||
RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) | ||
.addAsNew(metadata.getProject().index("test")) | ||
.build(); | ||
|
||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(initialRoutingTable).build(); | ||
|
||
assertThat(clusterState.routingTable().index("test").size(), equalTo(5)); | ||
for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { | ||
assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); | ||
assertThat(clusterState.routingTable().index("test").shard(i).shard(0).state(), equalTo(UNASSIGNED)); | ||
assertThat(clusterState.routingTable().index("test").shard(i).shard(1).state(), equalTo(UNASSIGNED)); | ||
assertThat(clusterState.routingTable().index("test").shard(i).shard(0).currentNodeId(), nullValue()); | ||
assertThat(clusterState.routingTable().index("test").shard(i).shard(1).currentNodeId(), nullValue()); | ||
|
||
} | ||
|
||
logger.info("start two nodes and fully start the shards"); | ||
clusterState = ClusterState.builder(clusterState) | ||
.nodes( | ||
DiscoveryNodes.builder() | ||
.add(newNode("node1", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node2", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
) | ||
.build(); | ||
clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); | ||
|
||
logger.info("start all the primary shards, replicas will start initializing"); | ||
clusterState = startInitializingShardsAndReroute(strategy, clusterState); | ||
|
||
logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened"); | ||
clusterState = ClusterState.builder(clusterState) | ||
.nodes( | ||
DiscoveryNodes.builder(clusterState.nodes()) | ||
.add(newNode("node3", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node4", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node5", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node6", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node7", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node8", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node9", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
.add(newNode("node10", Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE))) | ||
|
||
) | ||
.build(); | ||
|
||
|
||
clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); | ||
|
||
logger.info("start the replica shards, rebalancing should start, but without a limit 8 should be rebalancing"); | ||
clusterState = startInitializingShardsAndReroute(strategy, clusterState); | ||
|
||
// we only allow any number of relocations at a time | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: comment wording? perhaps mention that 8 will be migrating off to spread shards evenly |
||
assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(2)); | ||
assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), equalTo(8)); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.