|
| 1 | +/* |
| 2 | + * SPDX-License-Identifier: Apache-2.0 |
| 3 | + * |
| 4 | + * The OpenSearch Contributors require contributions made to |
| 5 | + * this file be licensed under the Apache-2.0 license or a |
| 6 | + * compatible open source license. |
| 7 | + */ |
| 8 | + |
| 9 | +package org.opensearch.cluster.routing.allocation; |
| 10 | + |
| 11 | +import org.opensearch.Version; |
| 12 | +import org.opensearch.cluster.ClusterName; |
| 13 | +import org.opensearch.cluster.ClusterState; |
| 14 | +import org.opensearch.cluster.OpenSearchAllocationTestCase; |
| 15 | +import org.opensearch.cluster.metadata.IndexMetadata; |
| 16 | +import org.opensearch.cluster.metadata.Metadata; |
| 17 | +import org.opensearch.cluster.node.DiscoveryNodeRole; |
| 18 | +import org.opensearch.cluster.node.DiscoveryNodes; |
| 19 | +import org.opensearch.cluster.routing.RoutingNode; |
| 20 | +import org.opensearch.cluster.routing.RoutingTable; |
| 21 | +import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; |
| 22 | +import org.opensearch.common.settings.ClusterSettings; |
| 23 | +import org.opensearch.common.settings.Settings; |
| 24 | +import org.opensearch.index.IndexModule; |
| 25 | + |
| 26 | +import java.util.Set; |
| 27 | +import java.util.stream.StreamSupport; |
| 28 | + |
| 29 | +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; |
| 30 | + |
| 31 | +/** |
| 32 | + * Tests allocation convergence in a tight-capacity scenario: 3 nodes, cluster limit of 6 shards/node |
| 33 | + * (18 total slots), 3 indices totaling 20 shards where one index has an index-level limit of 1 shard/node. |
| 34 | + * Expected result: 17 assigned shards, 3 unassigned (the constrained index's replicas). |
| 35 | + * <p> |
| 36 | + * This requires the balancer to redistribute shards optimally. LocalShardsBalancer handles this via |
| 37 | + * weight-based rebalancing. RemoteShardsBalancer's balance() only considers primary counts, so it |
| 38 | + * cannot always converge. |
| 39 | + */ |
| 40 | +public class ShardsBalancerTightCapacityTests extends OpenSearchAllocationTestCase { |
| 41 | + /** |
| 42 | + * LocalShardsBalancer converges to 17 assigned shards via weight-based total-shard rebalancing. |
| 43 | + */ |
| 44 | + public void testTightCapacityConvergenceWithLocalShards() { |
| 45 | + int assignedShards = runTightCapacityScenario(false); |
| 46 | + assertEquals("LocalShardsBalancer should converge to 17 assigned shards", 17, assignedShards); |
| 47 | + } |
| 48 | + |
| 49 | + /** |
| 50 | + * RemoteShardsBalancer may settle at 16 assigned shards because its balance() only rebalances |
| 51 | + * by primary count, not total shard count. When this is fixed, remove the {@code @AwaitsFix}. |
| 52 | + */ |
| 53 | + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/19726#issuecomment-4049069484") |
| 54 | + public void testTightCapacityConvergenceWithRemoteShards() { |
| 55 | + int assignedShards = runTightCapacityScenario(true); |
| 56 | + assertEquals("RemoteShardsBalancer should converge to 17 assigned shards", 17, assignedShards); |
| 57 | + } |
| 58 | + |
| 59 | + /** |
| 60 | + * @param remote true for remote-capable nodes/indices (RemoteShardsBalancer), false for local (LocalShardsBalancer) |
| 61 | + * @return number of assigned shards after allocation converges |
| 62 | + */ |
| 63 | + private int runTightCapacityScenario(boolean remote) { |
| 64 | + final String clusterLimitKey = remote |
| 65 | + ? ShardsLimitAllocationDecider.CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING.getKey() |
| 66 | + : ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(); |
| 67 | + final Settings settings = Settings.builder() |
| 68 | + .put("cluster.routing.allocation.node_concurrent_recoveries", 10) |
| 69 | + .put(clusterLimitKey, 6) |
| 70 | + .build(); |
| 71 | + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); |
| 72 | + final AllocationService strategy = createAllocationService(settings, clusterSettings, random()); |
| 73 | + |
| 74 | + final String indexLimitKey = remote |
| 75 | + ? ShardsLimitAllocationDecider.INDEX_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING.getKey() |
| 76 | + : ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(); |
| 77 | + |
| 78 | + // test1: 3p+3r with index limit 1/node, test2: 4p+4r, test3: 3p+3r = 20 total, 17 assignable |
| 79 | + final Metadata.Builder mb = Metadata.builder(); |
| 80 | + mb.put(IndexMetadata.builder("test1").settings(indexSettings(remote).put(indexLimitKey, 1)).numberOfShards(3).numberOfReplicas(1)); |
| 81 | + mb.put(IndexMetadata.builder("test2").settings(indexSettings(remote)).numberOfShards(4).numberOfReplicas(1)); |
| 82 | + mb.put(IndexMetadata.builder("test3").settings(indexSettings(remote)).numberOfShards(3).numberOfReplicas(1)); |
| 83 | + Metadata metadata = mb.build(); |
| 84 | + |
| 85 | + final RoutingTable routingTable = RoutingTable.builder() |
| 86 | + .addAsNew(metadata.index("test1")) |
| 87 | + .addAsNew(metadata.index("test2")) |
| 88 | + .addAsNew(metadata.index("test3")) |
| 89 | + .build(); |
| 90 | + |
| 91 | + final Set<DiscoveryNodeRole> roles = remote |
| 92 | + ? Set.of(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.WARM_ROLE) |
| 93 | + : Set.of(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); |
| 94 | + final DiscoveryNodes.Builder nb = DiscoveryNodes.builder(); |
| 95 | + for (int i = 0; i < 3; i++) { |
| 96 | + String id = "node-" + i; |
| 97 | + nb.add(newNode(id, id, roles)); |
| 98 | + } |
| 99 | + |
| 100 | + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) |
| 101 | + .metadata(metadata) |
| 102 | + .routingTable(routingTable) |
| 103 | + .nodes(nb) |
| 104 | + .build(); |
| 105 | + |
| 106 | + clusterState = allocateToConvergence(strategy, clusterState); |
| 107 | + |
| 108 | + return StreamSupport.stream(clusterState.getRoutingNodes().spliterator(), false).mapToInt(RoutingNode::numberOfOwningShards).sum(); |
| 109 | + } |
| 110 | + |
| 111 | + private Settings.Builder indexSettings(boolean remote) { |
| 112 | + final Settings.Builder sb = settings(Version.CURRENT); |
| 113 | + if (remote) { |
| 114 | + sb.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()); |
| 115 | + } |
| 116 | + return sb; |
| 117 | + } |
| 118 | + |
| 119 | + private ClusterState allocateToConvergence(AllocationService service, ClusterState clusterState) { |
| 120 | + clusterState = service.reroute(clusterState, "reroute"); |
| 121 | + int iterations = 0; |
| 122 | + while (clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() == false && iterations < 10) { |
| 123 | + clusterState = startInitializingShardsAndReroute(service, clusterState); |
| 124 | + iterations++; |
| 125 | + } |
| 126 | + assertTrue("Expected no shards to be INITIALIZING", clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty()); |
| 127 | + return clusterState; |
| 128 | + } |
| 129 | +} |
0 commit comments