diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index ea61081d02ebe..12e10fc5ae04e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -88,6 +88,8 @@ public class RoutingNodes implements Iterable { private int relocatingShards = 0; + private int relocatingFrozenShards = 0; + private final Map> attributeValuesByAttribute; private final Map recoveriesPerNode; @@ -152,6 +154,9 @@ private RoutingNodes(GlobalRoutingTable routingTable, DiscoveryNodes discoveryNo assignedShardsAdd(shard); if (shard.relocating()) { relocatingShards++; + if (isDedicatedFrozenNode(shard.currentNodeId())) { + relocatingFrozenShards++; + } ShardRouting targetShardRouting = shard.getTargetRelocatingShard(); addInitialRecovery(targetShardRouting, indexShard.primary); // LinkedHashMap to preserve order. @@ -192,6 +197,7 @@ private RoutingNodes(RoutingNodes routingNodes) { this.inactivePrimaryCount = routingNodes.inactivePrimaryCount; this.inactiveShardCount = routingNodes.inactiveShardCount; this.relocatingShards = routingNodes.relocatingShards; + this.relocatingFrozenShards = routingNodes.relocatingFrozenShards; this.attributeValuesByAttribute = Collections.synchronizedMap(Maps.copyOf(routingNodes.attributeValuesByAttribute, HashSet::new)); this.recoveriesPerNode = Maps.copyOf(routingNodes.recoveriesPerNode, Recoveries::copy); } @@ -343,6 +349,18 @@ public int getRelocatingShardCount() { return relocatingShards; } + public boolean isDedicatedFrozenNode(String nodeId) { + RoutingNode node = nodesToShards.get(nodeId); + if (node != null && node.node() != null && node.node().isDedicatedFrozenNode()) { + return true; + } + return false; + } + + public int getRelocatingFrozenShardCount() { + return relocatingFrozenShards; + } + /** * Returns all shards that are not in the state UNASSIGNED with the same shard * ID as the given shard. @@ -478,6 +496,9 @@ public Tuple relocateShard( ) { ensureMutable(); relocatingShards++; + if (isDedicatedFrozenNode(nodeId)) { + relocatingFrozenShards++; + } ShardRouting source = startedShard.relocate(nodeId, expectedShardSize); ShardRouting target = source.getTargetRelocatingShard(); updateAssigned(startedShard, source); @@ -726,6 +747,9 @@ private ShardRouting started(ShardRouting shard, long expectedShardSize) { */ private ShardRouting cancelRelocation(ShardRouting shard) { relocatingShards--; + if (isDedicatedFrozenNode(shard.currentNodeId())) { + relocatingFrozenShards--; + } ShardRouting cancelledShard = shard.cancelRelocation(); updateAssigned(shard, cancelledShard); return cancelledShard; @@ -881,6 +905,7 @@ public boolean equals(Object o) { && inactivePrimaryCount == that.inactivePrimaryCount && inactiveShardCount == that.inactiveShardCount && relocatingShards == that.relocatingShards + && relocatingFrozenShards == that.relocatingFrozenShards && nodesToShards.equals(that.nodesToShards) && unassignedShards.equals(that.unassignedShards) && assignedShards.equals(that.assignedShards) @@ -898,6 +923,7 @@ public int hashCode() { inactivePrimaryCount, inactiveShardCount, relocatingShards, + relocatingFrozenShards, attributeValuesByAttribute, recoveriesPerNode ); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java index deb3e4440f4ab..cbc515cb82a3b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ConcurrentRebalanceAllocationDecider.java @@ -20,13 +20,15 @@ /** * Similar to the {@link ClusterRebalanceAllocationDecider} this * {@link AllocationDecider} controls the number of currently in-progress - * re-balance (relocation) operations and restricts node allocations if the - * configured threshold is reached. The default number of concurrent rebalance - * operations is set to {@code 2} + * re-balance (shard relocation) operations and restricts node allocations + * if the configured threshold is reached. Frozen and non-frozen shards are + * considered separately. The default number of concurrent rebalance operations + * is set to {@code 2} for non-frozen shards, and {@code 10} for frozen shards. *

* Re-balance operations can be controlled in real-time via the cluster update API using - * {@code cluster.routing.allocation.cluster_concurrent_rebalance}. Iff this - * setting is set to {@code -1} the number of concurrent re-balance operations + * {@code cluster.routing.allocation.cluster_concurrent_rebalance} and + * {@code cluster.routing.allocation.cluster_concurrent_frozen_rebalance}. + * Iff either setting is set to {@code -1} the number of concurrent re-balance operations * are unlimited. */ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { @@ -44,21 +46,91 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider { ); private volatile int clusterConcurrentRebalance; + /** + * Same as cluster_concurrent_rebalance, but applies separately to frozen tier shards + * + * Defaults to the same value as normal concurrent rebalance, if unspecified + */ + public static final Setting CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING = Setting.intSetting( + "cluster.routing.allocation.cluster_concurrent_frozen_rebalance", + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, + -1, + Property.Dynamic, + Property.NodeScope + ); + private volatile int clusterConcurrentFrozenRebalance; + public ConcurrentRebalanceAllocationDecider(ClusterSettings clusterSettings) { clusterSettings.initializeAndWatch( CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, this::setClusterConcurrentRebalance ); - logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance); + clusterSettings.initializeAndWatch( + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING, + this::setClusterConcurrentFrozenRebalance + ); + logger.debug( + "using [cluster_concurrent_rebalance] with [concurrent_rebalance={}, concurrent_frozen_rebalance={}]", + clusterConcurrentRebalance, + clusterConcurrentFrozenRebalance + ); } private void setClusterConcurrentRebalance(int concurrentRebalance) { clusterConcurrentRebalance = concurrentRebalance; } + private void setClusterConcurrentFrozenRebalance(int concurrentFrozenRebalance) { + clusterConcurrentFrozenRebalance = concurrentFrozenRebalance; + } + @Override public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { - return canRebalance(allocation); + int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount(); + if (allocation.routingNodes().isDedicatedFrozenNode(shardRouting.currentNodeId())) { + if (clusterConcurrentFrozenRebalance == -1) { + return allocation.decision(Decision.YES, NAME, "unlimited concurrent frozen rebalances are allowed"); + } + if (relocatingFrozenShards >= clusterConcurrentFrozenRebalance) { + return allocation.decision( + Decision.THROTTLE, + NAME, + "reached the limit of concurrently rebalancing frozen shards [%d], cluster setting [%s=%d]", + relocatingFrozenShards, + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(), + clusterConcurrentFrozenRebalance + ); + } + return allocation.decision( + Decision.YES, + NAME, + "below threshold [%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", + clusterConcurrentFrozenRebalance, + relocatingFrozenShards + ); + } else { + int relocatingShards = allocation.routingNodes().getRelocatingShardCount() - relocatingFrozenShards; + if (clusterConcurrentRebalance == -1) { + return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); + } + if (relocatingShards >= clusterConcurrentRebalance) { + return allocation.decision( + Decision.THROTTLE, + NAME, + "reached the limit of concurrently rebalancing shards [%d], cluster setting [%s=%d]", + relocatingShards, + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), + clusterConcurrentRebalance + ); + } + return allocation.decision( + Decision.YES, + NAME, + "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", + clusterConcurrentRebalance, + relocatingShards + ); + } } /** @@ -68,33 +140,52 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca */ @Override public Decision canRebalance(RoutingAllocation allocation) { + int relocatingFrozenShards = allocation.routingNodes().getRelocatingFrozenShardCount(); int relocatingShards = allocation.routingNodes().getRelocatingShardCount(); if (allocation.isSimulating() && relocatingShards >= 2) { // BalancedShardAllocator is prone to perform unnecessary moves when cluster_concurrent_rebalance is set to high values (>2). // (See https://github.com/elastic/elasticsearch/issues/87279) // Above allocator is used in DesiredBalanceComputer. Since we do not move actual shard data during calculation // it is possible to artificially set above setting to 2 to avoid unnecessary moves in desired balance. + // Separately: keep overall limit in simulation to two including frozen shards return allocation.decision(Decision.THROTTLE, NAME, "allocation should move one shard at the time when simulating"); } - if (clusterConcurrentRebalance == -1) { - return allocation.decision(Decision.YES, NAME, "unlimited concurrent rebalances are allowed"); - } - if (relocatingShards >= clusterConcurrentRebalance) { + + // separate into frozen/non-frozen counts + relocatingShards = relocatingShards - relocatingFrozenShards; + + // either frozen or non-frozen having some allowance before their limit means the allocator has room to rebalance + if (clusterConcurrentRebalance == -1 || relocatingShards < clusterConcurrentRebalance) { return allocation.decision( - Decision.THROTTLE, + Decision.YES, NAME, - "reached the limit of concurrently rebalancing shards [%d], cluster setting [%s=%d]", - relocatingShards, + "below threshold [%s=%d] for concurrent rebalances, current rebalance shard count [%d]", CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), - clusterConcurrentRebalance + clusterConcurrentRebalance, + relocatingShards + ); + } + if (clusterConcurrentFrozenRebalance == -1 || relocatingFrozenShards < clusterConcurrentFrozenRebalance) { + return allocation.decision( + Decision.YES, + NAME, + "below threshold [%s=%d] for concurrent frozen rebalances, current frozen rebalance shard count [%d]", + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(), + clusterConcurrentFrozenRebalance, + relocatingFrozenShards ); } return allocation.decision( - Decision.YES, + Decision.THROTTLE, NAME, - "below threshold [%d] for concurrent rebalances, current rebalance shard count [%d]", + "reached the limit of concurrently rebalancing shards [%d] for concurrent rebalances, cluster setting [%s=%d], " + + "and [%d] for concurrent frozen rebalances, frozen cluster setting [%s=%d]", + relocatingShards, + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), clusterConcurrentRebalance, - relocatingShards + relocatingFrozenShards, + CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING.getKey(), + clusterConcurrentFrozenRebalance ); } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 7006b5adbe886..cdd5f835c72d1 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -238,6 +238,7 @@ public void apply(Settings value, Settings current, Settings previous) { BreakerSettings.CIRCUIT_BREAKER_TYPE, ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, + ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_FROZEN_REBALANCE_SETTING, EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING, FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceAllocationDeciderTests.java new file mode 100644 index 0000000000000..d7c51f9fa40fe --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceAllocationDeciderTests.java @@ -0,0 +1,340 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.TestShardRoutingRoleStrategies; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.Decision; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.shard.ShardId; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +/** + * A series of tests that set up cluster state with a specified number of relocations in progress. + * + * Then sets and checks the ConcurrentRebalanceAllocationDecider canRebalance logic, at both the cluster- + * and shard-level. + */ +public class ConcurrentRebalanceAllocationDeciderTests extends ESAllocationTestCase { + + public void testConcurrentUnlimited() { + ClusterState clusterState = setupConcurrentRelocations(5); + + Settings settings = Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 0) + .build(); + assertClusterAllocationDecision( + clusterState, + settings, + Decision.Type.YES, + "below threshold [cluster.routing.allocation.cluster_concurrent_rebalance=-1] for concurrent rebalances, " + + "current rebalance shard count [5]" + ); + assertShardAllocationDecision(clusterState, settings, Decision.Type.YES, "unlimited concurrent rebalances are allowed"); + + settings = Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 6) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 0) + .build(); + assertClusterAllocationDecision( + clusterState, + settings, + Decision.Type.YES, + "below threshold [cluster.routing.allocation.cluster_concurrent_rebalance=6] for concurrent rebalances, " + + "current rebalance shard count [5]" + ); + assertShardAllocationDecision( + clusterState, + settings, + Decision.Type.YES, + "below threshold [6] for concurrent rebalances, current rebalance shard count [5]" + ); + } + + public void testFrozenConcurrentUnlimited() { + ClusterState clusterState = setupConcurrentFrozenRelocations(5); + + Settings settings = Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 0) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", -1) + .build(); + assertClusterAllocationDecision( + clusterState, + settings, + Decision.Type.YES, + "below threshold [cluster.routing.allocation.cluster_concurrent_frozen_rebalance=-1] for concurrent frozen rebalances, " + + "current frozen rebalance shard count [5]" + ); + assertShardAllocationDecision(clusterState, settings, Decision.Type.YES, "unlimited concurrent frozen rebalances are allowed"); + + settings = Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 0) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 6) + .build(); + assertClusterAllocationDecision( + clusterState, + settings, + Decision.Type.YES, + "below threshold [cluster.routing.allocation.cluster_concurrent_frozen_rebalance=6] for concurrent frozen rebalances, " + + "current frozen rebalance shard count [5]" + ); + assertShardAllocationDecision( + clusterState, + settings, + Decision.Type.YES, + "below threshold [6] for concurrent frozen rebalances, current frozen rebalance shard count [5]" + ); + } + + public void testThrottleDecision() { + ClusterState clusterState = setupConcurrentRelocations(2); + + Settings settings = Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 2) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 0) + .build(); + assertClusterAllocationDecision( + clusterState, + settings, + Decision.Type.THROTTLE, + "reached the limit of concurrently rebalancing shards [2] for concurrent rebalances, " + + "cluster setting [cluster.routing.allocation.cluster_concurrent_rebalance=2], " + + "and [0] for concurrent frozen rebalances, frozen cluster setting " + + "[cluster.routing.allocation.cluster_concurrent_frozen_rebalance=0]" + ); + assertShardAllocationDecision( + clusterState, + settings, + Decision.Type.THROTTLE, + "reached the limit of concurrently rebalancing shards [2], " + + "cluster setting [cluster.routing.allocation.cluster_concurrent_rebalance=2]" + ); + } + + public void testFrozenThrottleDecision() { + ClusterState clusterState = setupConcurrentFrozenRelocations(2); + + Settings settings = Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 0) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 2) + .build(); + assertClusterAllocationDecision( + clusterState, + settings, + Decision.Type.THROTTLE, + "reached the limit of concurrently rebalancing shards [0] for concurrent rebalances, " + + "cluster setting [cluster.routing.allocation.cluster_concurrent_rebalance=0], " + + "and [2] for concurrent frozen rebalances, " + + "frozen cluster setting [cluster.routing.allocation.cluster_concurrent_frozen_rebalance=2]" + ); + assertShardAllocationDecision( + clusterState, + settings, + Decision.Type.THROTTLE, + "reached the limit of concurrently rebalancing frozen shards [2], " + + "cluster setting [cluster.routing.allocation.cluster_concurrent_frozen_rebalance=2]" + ); + } + + private void assertShardAllocationDecision( + ClusterState clusterState, + Settings settings, + Decision.Type decisionType, + String explanation + ) { + ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings); + + ConcurrentRebalanceAllocationDecider decider = new ConcurrentRebalanceAllocationDecider(clusterSettings); + + RoutingAllocation allocation = new RoutingAllocation( + new AllocationDeciders(Arrays.asList(decider)), + null, + clusterState, + null, + null, + 0 + ); + allocation.debugDecision(true); + + ShardRouting shardRouting = findStartedShard(clusterState); + Decision decision = decider.canRebalance(shardRouting, allocation); + + assertThat(decision.type(), equalTo(decisionType)); + assertThat(decision.getExplanation(), containsString(explanation)); + } + + private ShardRouting findStartedShard(ClusterState clusterState) { + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + Map> assignedShards = routingNodes.getAssignedShards(); + for (Map.Entry> entry : assignedShards.entrySet()) { + for (ShardRouting shardRouting : entry.getValue()) { + if (shardRouting.state() == ShardRoutingState.STARTED) { + return shardRouting; + } + } + } + assert false : "need at least one started shard"; + return null; + } + + private void assertClusterAllocationDecision( + ClusterState clusterState, + Settings settings, + Decision.Type decisionType, + String explanation + ) { + ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings); + + ConcurrentRebalanceAllocationDecider decider = new ConcurrentRebalanceAllocationDecider(clusterSettings); + + RoutingAllocation allocation = new RoutingAllocation( + new AllocationDeciders(Arrays.asList(decider)), + null, + clusterState, + null, + null, + 0 + ); + allocation.debugDecision(true); + + Decision decision = decider.canRebalance(allocation); + + assertThat(decision.type(), equalTo(decisionType)); + assertThat(decision.getExplanation(), containsString(explanation)); + } + + private Metadata initializeMetadata(int numberOfRelocations) { + return Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current())) + .numberOfShards(numberOfRelocations) + .numberOfReplicas(1) + ) + .build(); + } + + private Supplier nodeFactory() { + return new Supplier() { + int count = 1; + + @Override + public DiscoveryNode get() { + return ESAllocationTestCase.newNode("node" + count++); + } + }; + } + + private Metadata initializeFrozenMetadata(int numberOfRelocations) { + return Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN)) + .numberOfShards(numberOfRelocations) + .numberOfReplicas(1) + ) + .build(); + } + + private Supplier frozenNodeFactory() { + return new Supplier() { + int count = 1; + Set frozenRole = Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE); + + @Override + public DiscoveryNode get() { + return ESAllocationTestCase.newNode("node" + count++, frozenRole); + } + }; + } + + private ClusterState setupConcurrentRelocations(int relocations) { + return setupConcurrentRelocationsInternal(initializeMetadata(relocations), nodeFactory(), relocations); + } + + private ClusterState setupConcurrentFrozenRelocations(int relocations) { + return setupConcurrentRelocationsInternal(initializeFrozenMetadata(relocations), frozenNodeFactory(), relocations); + } + + /** + * Set up a cluster state so that a specified number of concurrent relocations are in progress + * + * Internally, this creates a bunch of shards (the number of relocations) and one replica, + * each on their own node. The primaries are all started, then the replicas are set up to all + * relocate to new nodes. + */ + private ClusterState setupConcurrentRelocationsInternal(Metadata metadata, Supplier nodeFactory, int relocations) { + assert relocations > 1 : "logic only works for 2 or more relocations as the replica needs to initialize elsewhere then relocate"; + + AllocationService allocationService = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", -1) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .build() + ); + + RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) + .addAsNew(metadata.getProject().index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(initialRoutingTable).build(); + + var nodeBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < relocations; i++) { + nodeBuilder = nodeBuilder.add(nodeFactory.get()); + } + + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + + // set up primaries, and have replicas initializing + clusterState = allocationService.reroute(clusterState, "reroute", ActionListener.noop()); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + + // add a bunch of nodes to create relocation chaos + var clusterStateBuilder = ClusterState.builder(clusterState); + nodeBuilder = DiscoveryNodes.builder(clusterStateBuilder.nodes()); + for (int i = 0; i < relocations; i++) { + nodeBuilder = nodeBuilder.add(nodeFactory.get()); + } + clusterState = clusterStateBuilder.nodes(nodeBuilder).build(); + + // start relocations + clusterState = allocationService.reroute(clusterState, "reroute", ActionListener.noop()); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + + return clusterState; + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java index 472df77f1e06e..9547aeb7516e3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java @@ -16,11 +16,20 @@ import org.elasticsearch.cluster.TestShardRoutingRoleStrategies; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexVersion; +import java.util.Collections; +import java.util.Set; +import java.util.function.Supplier; + import static org.elasticsearch.cluster.routing.RoutingNodesHelper.shardsWithState; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -32,103 +41,234 @@ public class ConcurrentRebalanceRoutingTests extends ESAllocationTestCase { public void testClusterConcurrentRebalance() { - AllocationService strategy = createAllocationService( + AllocationService allocationService = createAllocationService( Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 1) .build() ); - logger.info("Building initial routing table"); - Metadata metadata = Metadata.builder() .put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(5).numberOfReplicas(1)) .build(); + Supplier nodeFactory = new Supplier() { + int count = 1; + + @Override + public DiscoveryNode get() { + return ESAllocationTestCase.newNode("node" + count++); + } + }; + + testClusterConcurrentInternal(allocationService, metadata, nodeFactory); + } + + public void testClusterConcurrentRebalanceFrozen() { + AllocationService allocationService = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 3) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 1) + .build() + ); + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN)) + .numberOfShards(5) + .numberOfReplicas(1) + ) + .build(); + + Supplier nodeFactory = new Supplier() { + int count = 1; + Set frozenRole = Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE); + + @Override + public DiscoveryNode get() { + return ESAllocationTestCase.newNode("node" + count++, frozenRole); + } + }; + + testClusterConcurrentInternal(allocationService, metadata, nodeFactory); + } + + /** + * Run a series of concurrent rebalance checks on an index as nodes are created and the index changes state. + * Index must be named "test" + */ + void testClusterConcurrentInternal(AllocationService allocationService, Metadata metadata, Supplier nodeFactory) { RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) .addAsNew(metadata.getProject().index("test")) .build(); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(initialRoutingTable).build(); - assertThat(clusterState.routingTable().index("test").size(), equalTo(5)); - for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { - assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test").shard(i).shard(0).state(), equalTo(UNASSIGNED)); - assertThat(clusterState.routingTable().index("test").shard(i).shard(1).state(), equalTo(UNASSIGNED)); - assertThat(clusterState.routingTable().index("test").shard(i).shard(0).currentNodeId(), nullValue()); - assertThat(clusterState.routingTable().index("test").shard(i).shard(1).currentNodeId(), nullValue()); + IndexRoutingTable indexRouting = clusterState.routingTable().index("test"); + assertThat(indexRouting.size(), equalTo(5)); + for (int i = 0; i < indexRouting.size(); i++) { + IndexShardRoutingTable shardRouting = indexRouting.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.shard(0).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(1).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(0).currentNodeId(), nullValue()); + assertThat(shardRouting.shard(1).currentNodeId(), nullValue()); } logger.info("start two nodes and fully start the shards"); clusterState = ClusterState.builder(clusterState) - .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))) + .nodes(DiscoveryNodes.builder().add(nodeFactory.get()).add(nodeFactory.get())) .build(); - clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); + clusterState = allocationService.reroute(clusterState, "reroute", ActionListener.noop()); - for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { - assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); - assertThat(clusterState.routingTable().index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + indexRouting = clusterState.routingTable().index("test"); + for (int i = 0; i < indexRouting.size(); i++) { + IndexShardRoutingTable shardRouting = indexRouting.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(INITIALIZING)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(UNASSIGNED)); } logger.info("start all the primary shards, replicas will start initializing"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); - for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { - assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test").shard(i).primaryShard().state(), equalTo(STARTED)); - assertThat(clusterState.routingTable().index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + indexRouting = clusterState.routingTable().index("test"); + for (int i = 0; i < indexRouting.size(); i++) { + IndexShardRoutingTable shardRouting = indexRouting.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(STARTED)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(INITIALIZING)); } logger.info("now, start 8 more nodes, and check that no rebalancing/relocation have happened"); clusterState = ClusterState.builder(clusterState) .nodes( DiscoveryNodes.builder(clusterState.nodes()) - .add(newNode("node3")) - .add(newNode("node4")) - .add(newNode("node5")) - .add(newNode("node6")) - .add(newNode("node7")) - .add(newNode("node8")) - .add(newNode("node9")) - .add(newNode("node10")) + .add(nodeFactory.get()) + .add(nodeFactory.get()) + .add(nodeFactory.get()) + .add(nodeFactory.get()) + .add(nodeFactory.get()) + .add(nodeFactory.get()) + .add(nodeFactory.get()) + .add(nodeFactory.get()) ) .build(); - clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); - for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { - assertThat(clusterState.routingTable().index("test").shard(i).size(), equalTo(2)); - assertThat(clusterState.routingTable().index("test").shard(i).primaryShard().state(), equalTo(STARTED)); - assertThat(clusterState.routingTable().index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + clusterState = allocationService.reroute(clusterState, "reroute", ActionListener.noop()); + + indexRouting = clusterState.routingTable().index("test"); + for (int i = 0; i < indexRouting.size(); i++) { + IndexShardRoutingTable shardRouting = indexRouting.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(STARTED)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(INITIALIZING)); } logger.info("start the replica shards, rebalancing should start, but, only 3 should be rebalancing"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); // we only allow one relocation at a time - assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(7)); - assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), equalTo(3)); + assertThat(shardsWithState(routingNodes, STARTED).size(), equalTo(7)); + assertThat(shardsWithState(routingNodes, RELOCATING).size(), equalTo(3)); logger.info("finalize this session relocation, 3 more should relocate now"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + routingNodes = clusterState.getRoutingNodes(); // we only allow one relocation at a time - assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(7)); - assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), equalTo(3)); + assertThat(shardsWithState(routingNodes, STARTED).size(), equalTo(7)); + assertThat(shardsWithState(routingNodes, RELOCATING).size(), equalTo(3)); logger.info("finalize this session relocation, 2 more should relocate now"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + routingNodes = clusterState.getRoutingNodes(); // we only allow one relocation at a time - assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(8)); - assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), equalTo(2)); + assertThat(shardsWithState(routingNodes, STARTED).size(), equalTo(8)); + assertThat(shardsWithState(routingNodes, RELOCATING).size(), equalTo(2)); logger.info("finalize this session relocation, no more relocation"); - clusterState = startInitializingShardsAndReroute(strategy, clusterState); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + routingNodes = clusterState.getRoutingNodes(); // we only allow one relocation at a time - assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(10)); - assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), equalTo(0)); + assertThat(shardsWithState(routingNodes, STARTED).size(), equalTo(10)); + assertThat(shardsWithState(routingNodes, RELOCATING).size(), equalTo(0)); + } + + public void testClusterConcurrentRebalanceFrozenUnlimited() { + Set frozenRole = Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE); + + AllocationService allocationService = createAllocationService( + Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", -1) + .build() + ); + + logger.info("Building initial routing table"); + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN)) + .numberOfShards(5) + .numberOfReplicas(1) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) + .addAsNew(metadata.getProject().index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(initialRoutingTable).build(); + + IndexRoutingTable indexRoutingTable = clusterState.routingTable().index("test"); + assertThat(indexRoutingTable.size(), equalTo(5)); + for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { + IndexShardRoutingTable shardRouting = indexRoutingTable.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.shard(0).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(1).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(0).currentNodeId(), nullValue()); + assertThat(shardRouting.shard(1).currentNodeId(), nullValue()); + } + + logger.info("start two nodes and fully start the shards"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1", frozenRole)).add(newNode("node2", frozenRole))) + .build(); + clusterState = allocationService.reroute(clusterState, "reroute", ActionListener.noop()); + + logger.info("start all the primary shards, replicas will start initializing"); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + + var clusterStateBuilder = ClusterState.builder(clusterState); + + int nodeCount = randomIntBetween(8, 20); + var nodeBuilder = DiscoveryNodes.builder(clusterStateBuilder.nodes()); + + logger.info("now, start " + nodeCount + " more nodes, and check that no rebalancing/relocation have happened"); + + for (int i = 0; i < nodeCount; i++) { + int nodeId = 3 + i; + nodeBuilder = nodeBuilder.add(newNode("node" + nodeId, frozenRole)); + } + + clusterState = clusterStateBuilder.nodes(nodeBuilder).build(); + + clusterState = allocationService.reroute(clusterState, "reroute", ActionListener.noop()); + + logger.info("start the replica shards, rebalancing should start, but with a limit " + nodeCount + " should be rebalancing"); + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + + // we only allow any number of relocations at a time + assertThat(shardsWithState(clusterState.getRoutingNodes(), STARTED).size(), equalTo(2)); + assertThat(shardsWithState(clusterState.getRoutingNodes(), RELOCATING).size(), equalTo(8)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java index 6b6ef65789c37..933649ca20c6e 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java @@ -53,6 +53,7 @@ import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; +import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.snapshots.SnapshotShardSizeInfo; import org.elasticsearch.snapshots.SnapshotsInfoService; import org.elasticsearch.test.ClusterServiceUtils; @@ -140,9 +141,19 @@ public static MockAllocationService createAllocationService( GatewayAllocator gatewayAllocator, ClusterInfoService clusterInfoService, SnapshotsInfoService snapshotsInfoService + ) { + return createAllocationService(settings, gatewayAllocator, clusterInfoService, snapshotsInfoService, Collections.emptyList()); + } + + public static MockAllocationService createAllocationService( + Settings settings, + GatewayAllocator gatewayAllocator, + ClusterInfoService clusterInfoService, + SnapshotsInfoService snapshotsInfoService, + List clusterPlugins ) { return new MockAllocationService( - randomAllocationDeciders(settings, createBuiltInClusterSettings(settings)), + randomAllocationDeciders(settings, createBuiltInClusterSettings(settings), clusterPlugins), gatewayAllocator, createShardsAllocator(settings), clusterInfoService, @@ -151,8 +162,16 @@ public static MockAllocationService createAllocationService( } public static AllocationDeciders randomAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { + return randomAllocationDeciders(settings, clusterSettings, Collections.emptyList()); + } + + public static AllocationDeciders randomAllocationDeciders( + Settings settings, + ClusterSettings clusterSettings, + List clusterPlugins + ) { List deciders = new ArrayList<>( - ClusterModule.createAllocationDeciders(settings, clusterSettings, Collections.emptyList()) + ClusterModule.createAllocationDeciders(settings, clusterSettings, clusterPlugins) ); Collections.shuffle(deciders, random()); return new AllocationDeciders(deciders); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java index 787690d6f55e4..3caffe6ccb6b3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java @@ -9,9 +9,12 @@ import joptsimple.internal.Strings; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.TestShardRoutingRoleStrategies; import org.elasticsearch.cluster.metadata.DesiredNode; import org.elasticsearch.cluster.metadata.DesiredNodeWithStatus; import org.elasticsearch.cluster.metadata.DesiredNodes; @@ -25,11 +28,14 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GlobalRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingNodesHelper; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.DataTier; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -45,6 +51,8 @@ import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.SearchableSnapshotsSettings; +import org.elasticsearch.test.gateway.TestGatewayAllocator; +import org.elasticsearch.xpack.core.XPackPlugin; import java.util.ArrayList; import java.util.Arrays; @@ -57,6 +65,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.cluster.routing.RoutingNodesHelper.shardsWithState; +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; +import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.elasticsearch.cluster.routing.allocation.DataTier.DATA_COLD; import static org.elasticsearch.cluster.routing.allocation.DataTier.DATA_FROZEN; import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings; @@ -64,6 +77,7 @@ import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; public class DataTierAllocationDeciderTests extends ESAllocationTestCase { @@ -1027,4 +1041,153 @@ private NodesShutdownMetadata randomRestartInCluster(Set currentNodes) { ); } + public void testClusterConcurrentRebalanceIndependentLimits() { + final Set hotRole = Collections.singleton(DiscoveryNodeRole.DATA_HOT_NODE_ROLE); + final Set frozenRole = Collections.singleton(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE); + + Settings settings = Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) + .put("cluster.routing.allocation.cluster_concurrent_frozen_rebalance", 7) + .build(); + + AllocationService strategy = createAllocationService( + settings, + new TestGatewayAllocator(), + EmptyClusterInfoService.INSTANCE, + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES, + Arrays.asList(new XPackPlugin(settings)) + ); + + logger.info("Building initial routing table"); + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder("test") + .settings(settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_HOT)) + .numberOfShards(5) + .numberOfReplicas(1) + ) + .put( + IndexMetadata.builder("test_frozen") + .settings( + settings(IndexVersion.current()).put(DataTier.TIER_PREFERENCE, DataTier.DATA_FROZEN) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE) + .put(SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY, true) + ) + .numberOfShards(5) + .numberOfReplicas(1) + ) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) + .addAsNew(metadata.getProject().index("test")) + .addAsNew(metadata.getProject().index("test_frozen")) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(initialRoutingTable).build(); + + IndexRoutingTable index = clusterState.routingTable().index("test"); + assertThat(index.size(), equalTo(5)); + for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { + IndexShardRoutingTable shardRouting = index.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.shard(0).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(1).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(0).currentNodeId(), nullValue()); + assertThat(shardRouting.shard(1).currentNodeId(), nullValue()); + } + + IndexRoutingTable frozenIndex = clusterState.routingTable().index("test_frozen"); + assertThat(frozenIndex.size(), equalTo(5)); + for (int i = 0; i < clusterState.routingTable().index("test_frozen").size(); i++) { + IndexShardRoutingTable shardRouting = frozenIndex.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.shard(0).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(1).state(), equalTo(UNASSIGNED)); + assertThat(shardRouting.shard(0).currentNodeId(), nullValue()); + assertThat(shardRouting.shard(1).currentNodeId(), nullValue()); + } + + logger.info("start two nodes and fully start the shards"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(newNode("node1", hotRole)) + .add(newNode("node2", hotRole)) + .add(newNode("node1_frozen", frozenRole)) + .add(newNode("node2_frozen", frozenRole)) + ) + .build(); + clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); + + index = clusterState.routingTable().index("test"); + for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { + IndexShardRoutingTable shardRouting = index.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(INITIALIZING)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(UNASSIGNED)); + } + + frozenIndex = clusterState.routingTable().index("test_frozen"); + for (int i = 0; i < clusterState.routingTable().index("test_frozen").size(); i++) { + IndexShardRoutingTable shardRouting = frozenIndex.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(INITIALIZING)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(UNASSIGNED)); + } + + logger.info("start all the primary shards, replicas will start initializing"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + index = clusterState.routingTable().index("test"); + for (int i = 0; i < clusterState.routingTable().index("test").size(); i++) { + IndexShardRoutingTable shardRouting = index.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(STARTED)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + frozenIndex = clusterState.routingTable().index("test_frozen"); + for (int i = 0; i < clusterState.routingTable().index("test_frozen").size(); i++) { + IndexShardRoutingTable shardRouting = frozenIndex.shard(i); + assertThat(shardRouting.size(), equalTo(2)); + assertThat(shardRouting.primaryShard().state(), equalTo(STARTED)); + assertThat(shardRouting.replicaShards().get(0).state(), equalTo(INITIALIZING)); + } + + logger.info("now, start 8 more frozen and hot nodes"); + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder(clusterState.nodes()) + .add(newNode("node3", hotRole)) + .add(newNode("node4", hotRole)) + .add(newNode("node5", hotRole)) + .add(newNode("node6", hotRole)) + .add(newNode("node7", hotRole)) + .add(newNode("node8", hotRole)) + .add(newNode("node9", hotRole)) + .add(newNode("node10", hotRole)) + .add(newNode("node3_frozen", frozenRole)) + .add(newNode("node4_frozen", frozenRole)) + .add(newNode("node5_frozen", frozenRole)) + .add(newNode("node6_frozen", frozenRole)) + .add(newNode("node7_frozen", frozenRole)) + .add(newNode("node8_frozen", frozenRole)) + .add(newNode("node9_frozen", frozenRole)) + .add(newNode("node10_frozen", frozenRole)) + ) + .build(); + + clusterState = strategy.reroute(clusterState, "reroute", ActionListener.noop()); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + logger.info("Hot should be able to relocate its max of 3 shards, and frozen its max of 7 shards"); + + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + assertThat(shardsWithState(routingNodes, "test", STARTED).size(), equalTo(7)); + assertThat(shardsWithState(routingNodes, "test", RELOCATING).size(), equalTo(3)); + + assertThat(shardsWithState(routingNodes, "test_frozen", STARTED).size(), equalTo(3)); + assertThat(shardsWithState(routingNodes, "test_frozen", RELOCATING).size(), equalTo(7)); + } }