diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java index c9168e7618602..ee325d6a58efc 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -194,7 +196,13 @@ public void testLazyRolloverFailsIndexing() throws Exception { simpleUserClient.performRequest(createDocRequest); fail("Indexing should have failed."); } catch (ResponseException responseException) { - assertThat(responseException.getMessage(), containsString("this action would add [2] shards")); + assertThat( + responseException.getMessage(), + anyOf( + allOf(containsString("this action would add [2] shards"), containsString("maximum normal shards")), + allOf(containsString("this action would add [1] shards"), containsString("maximum index shards")) + ) + ); } updateClusterSettingsRequest = new Request("PUT", "_cluster/settings"); diff --git a/server/src/main/java/org/elasticsearch/health/node/ShardsCapacityHealthIndicatorService.java b/server/src/main/java/org/elasticsearch/health/node/ShardsCapacityHealthIndicatorService.java index 4da63ea9868db..f9c0c2bf2a536 100644 --- a/server/src/main/java/org/elasticsearch/health/node/ShardsCapacityHealthIndicatorService.java +++ b/server/src/main/java/org/elasticsearch/health/node/ShardsCapacityHealthIndicatorService.java @@ -10,6 +10,7 @@ package org.elasticsearch.health.node; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ReferenceDocs; @@ -25,8 +26,9 @@ import org.elasticsearch.health.metadata.HealthMetadata; import org.elasticsearch.indices.ShardLimitValidator; +import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; -import java.util.stream.Stream; /** * This indicator reports health data about the shard capacity across the cluster. @@ -45,8 +47,6 @@ public class ShardsCapacityHealthIndicatorService implements HealthIndicatorServ static final String NAME = "shards_capacity"; - static final String DATA_NODE_NAME = "data"; - static final String FROZEN_NODE_NAME = "frozen"; private static final String UPGRADE_BLOCKED = "The cluster has too many used shards to be able to upgrade."; private static final String UPGRADE_AT_RISK = "The cluster is running low on room to add new shard. Upgrading to a new version is at risk."; @@ -90,9 +90,11 @@ public class ShardsCapacityHealthIndicatorService implements HealthIndicatorServ ); private final ClusterService clusterService; + private final List shardLimitGroups; public ShardsCapacityHealthIndicatorService(ClusterService clusterService) { this.clusterService = clusterService; + this.shardLimitGroups = ShardLimitValidator.applicableLimitGroups(DiscoveryNode.isStateless(clusterService.getSettings())); } @Override @@ -109,26 +111,23 @@ public HealthIndicatorResult calculate(boolean verbose, int maxAffectedResources } var shardLimitsMetadata = healthMetadata.getShardLimitsMetadata(); - return mergeIndicators( - verbose, - calculateFrom( - shardLimitsMetadata.maxShardsPerNode(), - state.nodes(), - state.metadata(), - ShardLimitValidator::checkShardLimitForNormalNodes - ), - calculateFrom( - shardLimitsMetadata.maxShardsPerNodeFrozen(), - state.nodes(), - state.metadata(), - ShardLimitValidator::checkShardLimitForFrozenNodes + final List statusResults = shardLimitGroups.stream() + .map( + limitGroup -> calculateFrom( + ShardLimitValidator.getShardLimitPerNode(limitGroup, shardLimitsMetadata), + state.nodes(), + state.metadata(), + limitGroup::checkShardLimit + ) ) - ); + .toList(); + + return mergeIndicators(verbose, statusResults); } - private HealthIndicatorResult mergeIndicators(boolean verbose, StatusResult dataNodes, StatusResult frozenNodes) { - var finalStatus = HealthStatus.merge(Stream.of(dataNodes.status, frozenNodes.status)); - var diagnoses = List.of(); + private HealthIndicatorResult mergeIndicators(boolean verbose, List statusResults) { + var finalStatus = HealthStatus.merge(statusResults.stream().map(StatusResult::status)); + var diagnoses = new LinkedHashSet(); var symptomBuilder = new StringBuilder(); if (finalStatus == HealthStatus.GREEN) { @@ -139,19 +138,22 @@ private HealthIndicatorResult mergeIndicators(boolean verbose, StatusResult data // frozen* nodes, so we have to check each of the groups in order of provide the right message. if (finalStatus.indicatesHealthProblem()) { symptomBuilder.append("Cluster is close to reaching the configured maximum number of shards for "); - if (dataNodes.status == frozenNodes.status) { - symptomBuilder.append(DATA_NODE_NAME).append(" and ").append(FROZEN_NODE_NAME); - diagnoses = List.of(SHARDS_MAX_CAPACITY_REACHED_DATA_NODES, SHARDS_MAX_CAPACITY_REACHED_FROZEN_NODES); - - } else if (dataNodes.status.indicatesHealthProblem()) { - symptomBuilder.append(DATA_NODE_NAME); - diagnoses = List.of(SHARDS_MAX_CAPACITY_REACHED_DATA_NODES); - - } else if (frozenNodes.status.indicatesHealthProblem()) { - symptomBuilder.append(FROZEN_NODE_NAME); - diagnoses = List.of(SHARDS_MAX_CAPACITY_REACHED_FROZEN_NODES); + final var nodeTypeNames = new ArrayList(); + for (var statusResult : statusResults) { + if (statusResult.status.indicatesHealthProblem()) { + nodeTypeNames.add(nodeTypeFroLimitGroup(statusResult.result.group())); + diagnoses.add(diagnosisForLimitGroup(statusResult.result.group())); + } } + assert nodeTypeNames.isEmpty() == false; + symptomBuilder.append(nodeTypeNames.getFirst()); + for (int i = 1; i < nodeTypeNames.size() - 1; i++) { + symptomBuilder.append(", ").append(nodeTypeNames.get(i)); + } + if (nodeTypeNames.size() > 1) { + symptomBuilder.append(" and ").append(nodeTypeNames.getLast()); + } symptomBuilder.append(" nodes."); } @@ -164,9 +166,9 @@ private HealthIndicatorResult mergeIndicators(boolean verbose, StatusResult data return createIndicator( finalStatus, symptomBuilder.toString(), - verbose ? buildDetails(dataNodes.result, frozenNodes.result) : HealthIndicatorDetails.EMPTY, + verbose ? buildDetails(statusResults.stream().map(StatusResult::result).toList()) : HealthIndicatorDetails.EMPTY, indicatorImpacts, - verbose ? diagnoses : List.of() + verbose ? List.copyOf(diagnoses) : List.of() ); } @@ -189,22 +191,14 @@ static StatusResult calculateFrom( return new StatusResult(HealthStatus.GREEN, result); } - static HealthIndicatorDetails buildDetails(ShardLimitValidator.Result dataNodes, ShardLimitValidator.Result frozenNodes) { + static HealthIndicatorDetails buildDetails(List results) { return (builder, params) -> { builder.startObject(); - { - builder.startObject(DATA_NODE_NAME); - builder.field("max_shards_in_cluster", dataNodes.maxShardsInCluster()); - if (dataNodes.currentUsedShards().isPresent()) { - builder.field("current_used_shards", dataNodes.currentUsedShards().get()); - } - builder.endObject(); - } - { - builder.startObject("frozen"); - builder.field("max_shards_in_cluster", frozenNodes.maxShardsInCluster()); - if (frozenNodes.currentUsedShards().isPresent()) { - builder.field("current_used_shards", frozenNodes.currentUsedShards().get()); + for (var result : results) { + builder.startObject(nodeTypeFroLimitGroup(result.group())); + builder.field("max_shards_in_cluster", result.maxShardsInCluster()); + if (result.currentUsedShards().isPresent()) { + builder.field("current_used_shards", result.currentUsedShards().get()); } builder.endObject(); } @@ -223,6 +217,22 @@ private HealthIndicatorResult unknownIndicator() { ); } + private static String nodeTypeFroLimitGroup(ShardLimitValidator.LimitGroup limitGroup) { + return switch (limitGroup) { + case NORMAL -> "data"; + case FROZEN -> "frozen"; + case INDEX -> "index"; + case SEARCH -> "search"; + }; + } + + private static Diagnosis diagnosisForLimitGroup(ShardLimitValidator.LimitGroup limitGroup) { + return switch (limitGroup) { + case NORMAL, INDEX, SEARCH -> SHARDS_MAX_CAPACITY_REACHED_DATA_NODES; + case FROZEN -> SHARDS_MAX_CAPACITY_REACHED_FROZEN_NODES; + }; + } + record StatusResult(HealthStatus status, ShardLimitValidator.Result result) {} @FunctionalInterface diff --git a/server/src/main/java/org/elasticsearch/indices/ShardLimitValidator.java b/server/src/main/java/org/elasticsearch/indices/ShardLimitValidator.java index 6786ddb383e17..b57641edadd7f 100644 --- a/server/src/main/java/org/elasticsearch/indices/ShardLimitValidator.java +++ b/server/src/main/java/org/elasticsearch/indices/ShardLimitValidator.java @@ -19,12 +19,18 @@ import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.health.metadata.HealthMetadata; import org.elasticsearch.index.Index; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; @@ -53,12 +59,12 @@ public class ShardLimitValidator { ); public static final String FROZEN_GROUP = "frozen"; public static final String NORMAL_GROUP = "normal"; - static final Set VALID_GROUPS = Set.of(NORMAL_GROUP, FROZEN_GROUP); + private static final Set VALID_INDEX_SETTING_GROUPS = Set.of(NORMAL_GROUP, FROZEN_GROUP); public static final Setting INDEX_SETTING_SHARD_LIMIT_GROUP = Setting.simpleString( "index.shard_limit.group", NORMAL_GROUP, value -> { - if (VALID_GROUPS.contains(value) == false) { + if (VALID_INDEX_SETTING_GROUPS.contains(value) == false) { throw new IllegalArgumentException("[" + value + "] is not a valid shard limit group"); } }, @@ -68,6 +74,7 @@ public class ShardLimitValidator { ); protected final AtomicInteger shardLimitPerNode = new AtomicInteger(); protected final AtomicInteger shardLimitPerNodeFrozen = new AtomicInteger(); + private final boolean isStateless; public ShardLimitValidator(final Settings settings, ClusterService clusterService) { this.shardLimitPerNode.set(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(settings)); @@ -75,6 +82,7 @@ public ShardLimitValidator(final Settings settings, ClusterService clusterServic clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_NODE, this::setShardLimitPerNode); clusterService.getClusterSettings() .addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN, this::setShardLimitPerNodeFrozen); + this.isStateless = DiscoveryNode.isStateless(settings); } private void setShardLimitPerNode(int newValue) { @@ -85,12 +93,18 @@ private void setShardLimitPerNodeFrozen(int newValue) { this.shardLimitPerNodeFrozen.set(newValue); } - /** - * Gets the currently configured value of the {@link ShardLimitValidator#SETTING_CLUSTER_MAX_SHARDS_PER_NODE} setting. - * @return the current value of the setting - */ - public int getShardLimitPerNode() { - return shardLimitPerNode.get(); + private int getShardLimitPerNode(LimitGroup limitGroup) { + return switch (limitGroup) { + case NORMAL, INDEX, SEARCH -> shardLimitPerNode.get(); + case FROZEN -> shardLimitPerNodeFrozen.get(); + }; + } + + public static int getShardLimitPerNode(LimitGroup limitGroup, HealthMetadata.ShardLimits shardLimits) { + return switch (limitGroup) { + case NORMAL, INDEX, SEARCH -> shardLimits.maxShardsPerNode(); + case FROZEN -> shardLimits.maxShardsPerNodeFrozen(); + }; } /** @@ -102,17 +116,11 @@ public int getShardLimitPerNode() { * @throws ValidationException if creating this index would put the cluster over the cluster shard limit */ public void validateShardLimit(final Settings settings, final DiscoveryNodes discoveryNodes, final Metadata metadata) { - final int numberOfShards = INDEX_NUMBER_OF_SHARDS_SETTING.get(settings); - final int numberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings); - final int shardsToCreate = numberOfShards * (1 + numberOfReplicas); - final boolean frozen = FROZEN_GROUP.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(settings)); - - final var result = checkShardLimitOnBothGroups( - frozen == false ? shardsToCreate : 0, - frozen ? shardsToCreate : 0, - discoveryNodes, - metadata - ); + final var limitGroups = applicableLimitGroups(isStateless); + final var shardsToCreatePerGroup = limitGroups.stream() + .collect(Collectors.toUnmodifiableMap(Function.identity(), limitGroup -> limitGroup.newShardsTotal(settings))); + + final var result = checkShardLimitOnGroups(limitGroups, shardsToCreatePerGroup, discoveryNodes, metadata); if (result.canAddShards == false) { final ValidationException e = new ValidationException(); e.addValidationError(errorMessageFrom(result)); @@ -130,21 +138,20 @@ public void validateShardLimit(final Settings settings, final DiscoveryNodes dis * @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled. */ public void validateShardLimit(DiscoveryNodes discoveryNodes, Metadata metadata, Index[] indicesToOpen) { - int frozen = 0; - int normal = 0; + final var limitGroups = applicableLimitGroups(isStateless); + final Map shardsToCreatePerGroup = new HashMap<>(); + + // TODO: we can short circuit when indicesToOpen is empty for (Index index : indicesToOpen) { IndexMetadata imd = metadata.indexMetadata(index); if (imd.getState().equals(IndexMetadata.State.CLOSE)) { - int totalNewShards = imd.getNumberOfShards() * (1 + imd.getNumberOfReplicas()); - if (FROZEN_GROUP.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(imd.getSettings()))) { - frozen += totalNewShards; - } else { - normal += totalNewShards; - } + limitGroups.forEach( + limitGroup -> shardsToCreatePerGroup.merge(limitGroup, limitGroup.newShardsTotal(imd.getSettings()), Integer::sum) + ); } } - var result = checkShardLimitOnBothGroups(normal, frozen, discoveryNodes, metadata); + var result = checkShardLimitOnGroups(limitGroups, shardsToCreatePerGroup, discoveryNodes, metadata); if (result.canAddShards == false) { ValidationException ex = new ValidationException(); ex.addValidationError(errorMessageFrom(result)); @@ -153,19 +160,18 @@ public void validateShardLimit(DiscoveryNodes discoveryNodes, Metadata metadata, } public void validateShardLimitOnReplicaUpdate(DiscoveryNodes discoveryNodes, Metadata metadata, Index[] indices, int replicas) { - int frozen = 0; - int normal = 0; + final var limitGroups = applicableLimitGroups(isStateless); + final Map shardsToCreatePerGroup = new HashMap<>(); + + // TODO: we can short circuit when indices is empty for (Index index : indices) { IndexMetadata imd = metadata.indexMetadata(index); - int totalNewShards = getTotalNewShards(imd, replicas); - if (FROZEN_GROUP.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(imd.getSettings()))) { - frozen += totalNewShards; - } else { - normal += totalNewShards; - } + limitGroups.forEach( + limitGroup -> shardsToCreatePerGroup.merge(limitGroup, limitGroup.newShardsTotal(imd.getSettings(), replicas), Integer::sum) + ); } - var result = checkShardLimitOnBothGroups(normal, frozen, discoveryNodes, metadata); + var result = checkShardLimitOnGroups(limitGroups, shardsToCreatePerGroup, discoveryNodes, metadata); if (result.canAddShards == false) { ValidationException ex = new ValidationException(); ex.addValidationError(errorMessageFrom(result)); @@ -173,118 +179,49 @@ public void validateShardLimitOnReplicaUpdate(DiscoveryNodes discoveryNodes, Met } } - private static int getTotalNewShards(IndexMetadata indexMetadata, int updatedNumberOfReplicas) { - int shardsInIndex = indexMetadata.getNumberOfShards(); - int oldNumberOfReplicas = indexMetadata.getNumberOfReplicas(); - int replicaIncrease = updatedNumberOfReplicas - oldNumberOfReplicas; - return replicaIncrease * shardsInIndex; + public static List applicableLimitGroups(boolean isStateless) { + return isStateless ? List.of(LimitGroup.INDEX, LimitGroup.SEARCH) : List.of(LimitGroup.NORMAL, LimitGroup.FROZEN); } /** * Checks to see if an operation can be performed without taking the cluster over the cluster-wide shard limit. It follows the * next rules: - * - Check limits for _normal_ nodes - * - If there's no room -> return the Result for _normal_ nodes (fail-fast) - * - otherwise -> returns the Result of checking the limits for _frozen_ nodes + * - Check limits for nodes in the first group, e.g. _normal_ nodes + * - If there's no room -> return the Result for nodes of the first group (fail-fast) + * - otherwise -> returns the Result of checking the limits for the next group, e.g. _frozen_ nodes + * - Rinse and repeat if thera re more groups. But so far we only have 2 members in a group. * - * @param newShards The number of normal shards to be added by this operation - * @param newFrozenShards The number of frozen shards to be added by this operation - * @param discoveryNodes The nodes in the cluster - * @param metadata The cluster state metadata - */ - private Result checkShardLimitOnBothGroups(int newShards, int newFrozenShards, DiscoveryNodes discoveryNodes, Metadata metadata) { - // we verify the two limits independently. This also means that if they have mixed frozen and other data-roles nodes, such a mixed - // node can have both 1000 normal and 3000 frozen shards. This is the trade-off to keep the simplicity of the counts. We advocate - // against such mixed nodes for production use anyway. - int frozenNodeCount = nodeCount(discoveryNodes, ShardLimitValidator::hasFrozen); - int normalNodeCount = nodeCount(discoveryNodes, ShardLimitValidator::hasNonFrozen); - - var result = checkShardLimit(newShards, metadata, getShardLimitPerNode(), normalNodeCount, NORMAL_GROUP); - // fail-fast: in case there's no room on the `normal` nodes, just return the result of that check. - if (result.canAddShards() == false) { - return result; - } - return checkShardLimit(newFrozenShards, metadata, shardLimitPerNodeFrozen.get(), frozenNodeCount, FROZEN_GROUP); - } - - /** - * This method checks whether there is enough room in the cluster to add the given number of shards with the given number of replicas - * without exceeding the "cluster.max_shards_per_node" setting for _normal_ nodes. This check does not guarantee that the number of - * shards can be added, just that there is theoretically room to add them without exceeding the shards per node configuration. - * @param maxConfiguredShardsPerNode The maximum available number of shards to be allocated within a node - * @param numberOfNewShards The number of primary shards that we want to be able to add to the cluster - * @param replicas The number of replicas of the primary shards that we want to be able to add to the cluster - * @param discoveryNodes The nodes in the cluster, used to get the number of open shard already in the cluster - * @param metadata The cluster state metadata, used to get the cluster settings - */ - public static Result checkShardLimitForNormalNodes( - int maxConfiguredShardsPerNode, - int numberOfNewShards, - int replicas, - DiscoveryNodes discoveryNodes, - Metadata metadata - ) { - return checkShardLimit( - numberOfNewShards * (1 + replicas), - metadata, - maxConfiguredShardsPerNode, - nodeCount(discoveryNodes, ShardLimitValidator::hasNonFrozen), - NORMAL_GROUP - ); - } - - /** - * This method checks whether there is enough room in the cluster to add the given number of shards with the given number of replicas - * without exceeding the "cluster.max_shards_per_node_frozen" setting for _frozen_ nodes. This check does not guarantee that the number - * of shards can be added, just that there is theoretically room to add them without exceeding the shards per node configuration. - * @param maxConfiguredShardsPerNode The maximum available number of shards to be allocated within a node - * @param numberOfNewShards The number of primary shards that we want to be able to add to the cluster - * @param replicas The number of replicas of the primary shards that we want to be able to add to the cluster - * @param discoveryNodes The nodes in the cluster, used to get the number of open shard already in the cluster - * @param metadata The cluster state metadata, used to get the cluster settings + * @param limitGroups The applicable limit groups to check for shard limits + * @param shardsToCreatePerGroup The number of new shards to create per limit group + * @param discoveryNodes The nodes in the cluster + * @param metadata The cluster state metadata */ - public static Result checkShardLimitForFrozenNodes( - int maxConfiguredShardsPerNode, - int numberOfNewShards, - int replicas, + private Result checkShardLimitOnGroups( + List limitGroups, + Map shardsToCreatePerGroup, DiscoveryNodes discoveryNodes, Metadata metadata ) { - return checkShardLimit( - numberOfNewShards * (1 + replicas), - metadata, - maxConfiguredShardsPerNode, - nodeCount(discoveryNodes, ShardLimitValidator::hasFrozen), - FROZEN_GROUP - ); - } - - private static Result checkShardLimit(int newShards, Metadata metadata, int maxConfiguredShardsPerNode, int nodeCount, String group) { - int maxShardsInCluster = maxConfiguredShardsPerNode * nodeCount; - int currentOpenShards = metadata.getTotalOpenIndexShards(); - - // Only enforce the shard limit if we have at least one data node, so that we don't block - // index creation during cluster setup - if (nodeCount == 0 || newShards <= 0) { - return new Result(true, Optional.empty(), newShards, maxShardsInCluster, group); - } - - if ((currentOpenShards + newShards) > maxShardsInCluster) { - Predicate indexMetadataPredicate = imd -> imd.getState().equals(IndexMetadata.State.OPEN) - && group.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(imd.getSettings())); - long currentFilteredShards = metadata.projects() - .values() - .stream() - .flatMap(projectMetadata -> projectMetadata.indices().values().stream()) - .filter(indexMetadataPredicate) - .mapToInt(IndexMetadata::getTotalNumberOfShards) - .sum(); - - if ((currentFilteredShards + newShards) > maxShardsInCluster) { - return new Result(false, Optional.of(currentFilteredShards), newShards, maxShardsInCluster, group); + assert limitGroups.containsAll(shardsToCreatePerGroup.keySet()) + : "limit groups " + limitGroups + " do not contain groups for shards creation " + shardsToCreatePerGroup.keySet(); + // we verify the two limits independently. This also means that if they have mixed frozen and other data-roles nodes, such a mixed + // node can have both 1000 normal and 3000 frozen shards. This is the trade-off to keep the simplicity of the counts. We advocate + // against such mixed nodes for production use anyway. + Result result = null; + for (var limitGroup : limitGroups) { + result = limitGroup.checkShardLimit( + getShardLimitPerNode(limitGroup), + shardsToCreatePerGroup.getOrDefault(limitGroup, 0), + discoveryNodes, + metadata + ); + // fail-fast: in case there's no room on an earlier group, e.g. `normal`, just return the result of that check. + if (result.canAddShards() == false) { + return result; } } - return new Result(true, Optional.empty(), newShards, maxShardsInCluster, group); + assert result != null; + return result; } private static int nodeCount(DiscoveryNodes discoveryNodes, Predicate nodePredicate) { @@ -312,6 +249,211 @@ static String errorMessageFrom(Result result) { + ReferenceDocs.MAX_SHARDS_PER_NODE; } + public enum LimitGroup { + NORMAL(NORMAL_GROUP) { + @Override + public int numberOfNodes(DiscoveryNodes discoveryNodes) { + return nodeCount(discoveryNodes, ShardLimitValidator::hasNonFrozen); + } + + @Override + public int countShards(IndexMetadata indexMetadata) { + return isOpenIndex(indexMetadata) && matchesIndexSettingGroup(indexMetadata, LimitGroup.NORMAL.groupName()) + ? indexMetadata.getTotalNumberOfShards() + : 0; + } + + @Override + public int newShardsTotal(int shards, int replicas) { + return shards * (1 + replicas); + } + + @Override + protected int newReplicaShards(boolean isFrozenIndex, int shards, int replicaIncrease) { + return isFrozenIndex ? 0 : shards * replicaIncrease; + } + }, + FROZEN(FROZEN_GROUP) { + @Override + public int numberOfNodes(DiscoveryNodes discoveryNodes) { + return nodeCount(discoveryNodes, ShardLimitValidator::hasFrozen); + } + + @Override + public int countShards(IndexMetadata indexMetadata) { + return isOpenIndex(indexMetadata) && matchesIndexSettingGroup(indexMetadata, LimitGroup.FROZEN.groupName()) + ? indexMetadata.getTotalNumberOfShards() + : 0; + } + + @Override + public int newShardsTotal(int shards, int replicas) { + return shards * (1 + replicas); + } + + @Override + protected int newReplicaShards(boolean isFrozenIndex, int shards, int replicaIncrease) { + return isFrozenIndex ? shards * replicaIncrease : 0; + } + }, + INDEX("index") { + @Override + public int numberOfNodes(DiscoveryNodes discoveryNodes) { + return nodeCount(discoveryNodes, node -> node.hasRole(DiscoveryNodeRole.INDEX_ROLE.roleName())); + } + + @Override + public int countShards(IndexMetadata indexMetadata) { + return isOpenIndex(indexMetadata) ? indexMetadata.getNumberOfShards() : 0; + } + + @Override + public int newShardsTotal(int shards, int replicas) { + return shards; + } + + @Override + protected int newReplicaShards(boolean isFrozenIndex, int shards, int replicaIncrease) { + return 0; + } + }, + SEARCH("search") { + @Override + public int numberOfNodes(DiscoveryNodes discoveryNodes) { + return nodeCount(discoveryNodes, node -> node.hasRole(DiscoveryNodeRole.SEARCH_ROLE.roleName())); + } + + @Override + public int countShards(IndexMetadata indexMetadata) { + return isOpenIndex(indexMetadata) ? indexMetadata.getNumberOfShards() * indexMetadata.getNumberOfReplicas() : 0; + } + + @Override + public int newShardsTotal(int shards, int replicas) { + return shards * replicas; + } + + @Override + protected int newReplicaShards(boolean isFrozenIndex, int shards, int replicaIncrease) { + return shards * replicaIncrease; + } + }; + + private final String groupName; + + LimitGroup(String groupName) { + this.groupName = groupName; + } + + public String groupName() { + return groupName; + } + + @Override + public String toString() { + return groupName; + } + + public abstract int numberOfNodes(DiscoveryNodes discoveryNodes); + + public abstract int countShards(IndexMetadata indexMetadata); + + /** + * Compute the total number of new shards including both primaries and replicas that would be created for the given + * number of shards and replicas in this group. + * @param shards Number of primary shards + * @param replicas Number of replica shards per primary + * @return Number of total new shards to be created for the group. + */ + public abstract int newShardsTotal(int shards, int replicas); + + protected abstract int newReplicaShards(boolean isFrozenIndex, int shards, int replicaIncrease); + + /** + * Compute the total number of new shards including both primaries and replicas that would be created for an index with the + * given index settings. + * @param indexSettings The index settings for the index to be created. + * @return The total number of new shards to be created for this group. + */ + public int newShardsTotal(Settings indexSettings) { + final boolean isFrozenIndex = FROZEN_GROUP.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(indexSettings)); + final int numberOfShards = (isFrozenIndex == (this == FROZEN)) ? INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettings) : 0; + final int numberOfReplicas = (isFrozenIndex == (this == FROZEN)) + ? IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(indexSettings) + : 0; + return newShardsTotal(numberOfShards, numberOfReplicas); + } + + /** + * Compute the total number of new replica shards that would be created by updating the number of replicas to the given number. + * @param indexSettings The index settings for the index to be updated. + * @param updatedReplicas The updated number of replicas for the index. + * @return The number of new replica shards to be created for this group. + */ + public int newShardsTotal(Settings indexSettings, int updatedReplicas) { + final boolean isFrozenIndex = FROZEN_GROUP.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(indexSettings)); + final int shards = INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettings); + final int replicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(indexSettings); + final int replicaIncrease = updatedReplicas - replicas; + return newReplicaShards(isFrozenIndex, shards, replicaIncrease); + } + + /** + * This method checks whether there is enough room in the cluster to add the given number of shards with the given number of + * replicas without exceeding the max_shards_per_node requirement as specified by maxConfiguredShardsPerNode. + * This check does not guarantee that the number of shards can be added, just that there is theoretically room to add them + * without exceeding the shards per node configuration. + * @param maxConfiguredShardsPerNode The maximum available number of shards to be allocated within a node + * @param numberOfNewShards The number of primary shards that we want to be able to add to the cluster + * @param replicas The number of replicas of the primary shards that we want to be able to add to the cluster + * @param discoveryNodes The nodes in the cluster, used to get the number of open shard already in the cluster + * @param metadata The cluster state metadata, used to get the cluster settings + */ + public Result checkShardLimit( + int maxConfiguredShardsPerNode, + int numberOfNewShards, + int replicas, + DiscoveryNodes discoveryNodes, + Metadata metadata + ) { + return checkShardLimit(maxConfiguredShardsPerNode, newShardsTotal(numberOfNewShards, replicas), discoveryNodes, metadata); + } + + private Result checkShardLimit(int maxConfiguredShardsPerNode, int newShards, DiscoveryNodes discoveryNodes, Metadata metadata) { + final int nodeCount = numberOfNodes(discoveryNodes); + int maxShardsInCluster = maxConfiguredShardsPerNode * nodeCount; + int currentOpenShards = metadata.getTotalOpenIndexShards(); + + // Only enforce the shard limit if we have at least one data node, so that we don't block + // index creation during cluster setup + if (nodeCount == 0 || newShards <= 0) { + return new Result(true, Optional.empty(), newShards, maxShardsInCluster, this); + } + + if ((currentOpenShards + newShards) > maxShardsInCluster) { + long currentFilteredShards = metadata.projects() + .values() + .stream() + .flatMap(projectMetadata -> projectMetadata.indices().values().stream()) + .mapToInt(this::countShards) + .sum(); + + if ((currentFilteredShards + newShards) > maxShardsInCluster) { + return new Result(false, Optional.of(currentFilteredShards), newShards, maxShardsInCluster, this); + } + } + return new Result(true, Optional.empty(), newShards, maxShardsInCluster, this); + } + } + + private static boolean isOpenIndex(IndexMetadata indexMetadata) { + return indexMetadata.getState().equals(IndexMetadata.State.OPEN); + } + + private static boolean matchesIndexSettingGroup(IndexMetadata indexMetadata, String group) { + return group.equals(INDEX_SETTING_SHARD_LIMIT_GROUP.get(indexMetadata.getSettings())); + } + /** * A Result object containing enough information to be used by external callers about the state of the cluster from the shard limits * perspective. @@ -321,6 +463,7 @@ public record Result( Optional currentUsedShards, int totalShardsToAdd, int maxShardsInCluster, - String group + LimitGroup group ) {} + } diff --git a/server/src/test/java/org/elasticsearch/cluster/shards/ShardCounts.java b/server/src/test/java/org/elasticsearch/cluster/shards/ShardCounts.java index c6e8551152e94..ab8339cafb403 100644 --- a/server/src/test/java/org/elasticsearch/cluster/shards/ShardCounts.java +++ b/server/src/test/java/org/elasticsearch/cluster/shards/ShardCounts.java @@ -58,6 +58,52 @@ public static ShardCounts forDataNodeCount(int dataNodes) { return new ShardCounts(shardsPerNode, mainIndexShards, mainIndexReplicas, failingIndexShards, failingIndexReplicas); } + private static final int PER_INDEX_MAX_NUMBER_OF_SHARDS = 100; // The actual limit is 1024. But 100 is sufficient for tests + + public static ShardCounts forIndexNodeCount(int numIndexNodes) { + int mainIndexShards = ESTestCase.between(1, PER_INDEX_MAX_NUMBER_OF_SHARDS); + int mainIndexReplicas = ESTestCase.between(0, 1); + + // At least fits primary shards from the main index + int shardsPerNode = mainIndexShards / numIndexNodes + 1; + // No more than max number of shards can be created for one index + int maxShardsPerNode = (mainIndexShards + PER_INDEX_MAX_NUMBER_OF_SHARDS - 1) / numIndexNodes; + if (shardsPerNode < maxShardsPerNode) { + shardsPerNode = ESTestCase.between(shardsPerNode, maxShardsPerNode); + } + + // Number of new shards for the failing index should be over the limit determined by shardsPerNode + int failingIndexShards = ESTestCase.between(shardsPerNode * numIndexNodes - mainIndexShards + 1, PER_INDEX_MAX_NUMBER_OF_SHARDS); + int failingIndexReplicas = ESTestCase.between(0, 1); + + return new ShardCounts(shardsPerNode, mainIndexShards, mainIndexReplicas, failingIndexShards, failingIndexReplicas); + } + + public static ShardCounts forSearchNodeCount(int numSearchNodes) { + int mainIndexShards = ESTestCase.between(1, PER_INDEX_MAX_NUMBER_OF_SHARDS); + int mainIndexReplicas = ESTestCase.between(1, numSearchNodes); + int failingIndexShards = ESTestCase.between(2, PER_INDEX_MAX_NUMBER_OF_SHARDS); + + // At least fits all primary shards from both indices and replica shards from the main index (numSearchNodes = numIndexNodes) + int shardsPerNode = Math.max( + (mainIndexShards + failingIndexShards) / numSearchNodes, + (mainIndexShards * mainIndexReplicas) / numSearchNodes + ) + 1; + // No more than the max number of replica shards can be created for the failing index + int maxShardsPerNode = (mainIndexShards * mainIndexReplicas + failingIndexShards * numSearchNodes - 1) / numSearchNodes; + if (shardsPerNode < maxShardsPerNode) { + shardsPerNode = ESTestCase.between(shardsPerNode, maxShardsPerNode); + } + + int failingIndexReplicas = (int) Math.ceil( + (double) (shardsPerNode * numSearchNodes - mainIndexShards * mainIndexReplicas + 1) / failingIndexShards + ); + + assert failingIndexReplicas <= numSearchNodes : failingIndexReplicas + " > " + numSearchNodes; + + return new ShardCounts(shardsPerNode, mainIndexShards, mainIndexReplicas, failingIndexShards, failingIndexReplicas); + } + public int getShardsPerNode() { return shardsPerNode; } diff --git a/server/src/test/java/org/elasticsearch/health/node/ShardsCapacityHealthIndicatorServiceStatelessTests.java b/server/src/test/java/org/elasticsearch/health/node/ShardsCapacityHealthIndicatorServiceStatelessTests.java new file mode 100644 index 0000000000000..20005e3705aba --- /dev/null +++ b/server/src/test/java/org/elasticsearch/health/node/ShardsCapacityHealthIndicatorServiceStatelessTests.java @@ -0,0 +1,218 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.health.node; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.health.HealthStatus; +import org.elasticsearch.health.metadata.HealthMetadata; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentType; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; +import static org.elasticsearch.health.HealthStatus.RED; +import static org.elasticsearch.health.HealthStatus.YELLOW; +import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE; +import static org.hamcrest.Matchers.is; + +public class ShardsCapacityHealthIndicatorServiceStatelessTests extends ESTestCase { + + public static final HealthMetadata.Disk DISK_METADATA = HealthMetadata.Disk.newBuilder().build(); + + private static ThreadPool threadPool; + + private ClusterService clusterService; + + @Before + public void setUp() throws Exception { + super.setUp(); + clusterService = ClusterServiceUtils.createClusterService(threadPool, Settings.builder().put("stateless.enabled", true).build()); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + clusterService.close(); + } + + @BeforeClass + public static void setUpThreadPool() { + threadPool = new TestThreadPool(getTestClass().getSimpleName()); + } + + @AfterClass + public static void tearDownThreadPool() { + terminate(threadPool); + } + + public void testIndicatorYieldsGreenInCaseThereIsRoom() throws IOException { + int maxShardsPerNode = randomValidMaxShards(); + var clusterService = createClusterService(maxShardsPerNode, 1, 1, createIndex(maxShardsPerNode / 4)); + var indicatorResult = new ShardsCapacityHealthIndicatorService(clusterService).calculate(true, HealthInfo.EMPTY_HEALTH_INFO); + + assertEquals(HealthStatus.GREEN, indicatorResult.status()); + assertTrue(indicatorResult.impacts().isEmpty()); + assertTrue(indicatorResult.diagnosisList().isEmpty()); + assertEquals("The cluster has enough room to add new shards.", indicatorResult.symptom()); + assertThat( + xContentToMap(indicatorResult.details()), + is( + Map.of( + "index", + Map.of("max_shards_in_cluster", maxShardsPerNode), + "search", + Map.of("max_shards_in_cluster", maxShardsPerNode) + ) + ) + ); + } + + public void testIndicatorYieldsColorInCaseThereIsNotEnoughRoom() { + final int maxShardsPerNode = randomValidMaxShards(); + // Red health if there is no room for 5 more shards, so we take 4 out of the max + doTestIndicatorYieldsYellowInCaseThereIsNotEnoughRoom(RED, maxShardsPerNode, maxShardsPerNode - 4); + // Yellow health if there is no room for 10 more shards, so we take 9 out of the max + doTestIndicatorYieldsYellowInCaseThereIsNotEnoughRoom(YELLOW, maxShardsPerNode, maxShardsPerNode - 9); + } + + public void doTestIndicatorYieldsYellowInCaseThereIsNotEnoughRoom(HealthStatus status, int maxShardsPerNode, int indexNumShards) { + { + // Only index does not have enough space + var clusterService = createClusterService(maxShardsPerNode, 1, 2, createIndex(indexNumShards)); + var indicatorResult = new ShardsCapacityHealthIndicatorService(clusterService).calculate(true, HealthInfo.EMPTY_HEALTH_INFO); + + assertEquals(indicatorResult.status(), status); + assertEquals( + "Cluster is close to reaching the configured maximum number of shards for index nodes.", + indicatorResult.symptom() + ); + } + { + // Only search does not have enough space + var clusterService = createClusterService(maxShardsPerNode, 2, 1, createIndex(indexNumShards)); + var indicatorResult = new ShardsCapacityHealthIndicatorService(clusterService).calculate(true, HealthInfo.EMPTY_HEALTH_INFO); + + assertEquals(indicatorResult.status(), status); + assertEquals( + "Cluster is close to reaching the configured maximum number of shards for search nodes.", + indicatorResult.symptom() + ); + } + { + // Both data and frozen nodes does not have enough space + var clusterService = createClusterService(maxShardsPerNode, 1, 1, createIndex(indexNumShards)); + var indicatorResult = new ShardsCapacityHealthIndicatorService(clusterService).calculate(true, HealthInfo.EMPTY_HEALTH_INFO); + + assertEquals(indicatorResult.status(), status); + assertEquals( + "Cluster is close to reaching the configured maximum number of shards for index and search nodes.", + indicatorResult.symptom() + ); + } + } + + private static int randomValidMaxShards() { + return randomIntBetween(10, 100); + } + + private Map xContentToMap(ToXContent xcontent) throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder(); + xcontent.toXContent(builder, ToXContent.EMPTY_PARAMS); + XContentParser parser = XContentType.JSON.xContent() + .createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput()); + return parser.map(); + } + + private ClusterService createClusterService( + int maxShardsPerNode, + int numIndexNodes, + int numSearchNodes, + IndexMetadata.Builder... indexMetadata + ) { + final ClusterState clusterState = createClusterState( + nodesWithIndexAndSearch(numIndexNodes, numSearchNodes), + maxShardsPerNode, + new HealthMetadata(DISK_METADATA, new HealthMetadata.ShardLimits(maxShardsPerNode, 0)), + indexMetadata + ); + ClusterServiceUtils.setState(clusterService, clusterState); + return clusterService; + } + + private ClusterState createClusterState( + DiscoveryNodes discoveryNodes, + int maxShardsPerNode, + HealthMetadata healthMetadata, + IndexMetadata.Builder... indexMetadata + ) { + var clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .nodes(discoveryNodes) + .build() + .copyAndUpdate(b -> b.putCustom(HealthMetadata.TYPE, healthMetadata)); + + var metadata = Metadata.builder() + .persistentSettings(Settings.builder().put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), maxShardsPerNode).build()); + + for (var idxMetadata : indexMetadata) { + metadata.put(ProjectMetadata.builder(ProjectId.DEFAULT).put(idxMetadata)); + } + + return ClusterState.builder(clusterState).metadata(metadata).build(); + } + + private DiscoveryNodes nodesWithIndexAndSearch(int numIndexNodes, int numSearchNodes) { + assert numIndexNodes > 0 : "there must be at least one index node"; + final String indexNodeId = "index"; + final DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + builder.add( + DiscoveryNodeUtils.builder(indexNodeId).roles(Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.INDEX_ROLE)).build() + ); + + for (int i = 1; i < numIndexNodes; i++) { + builder.add(DiscoveryNodeUtils.builder("index-" + i).roles(Set.of(DiscoveryNodeRole.INDEX_ROLE)).build()); + } + for (int i = 0; i < numSearchNodes; i++) { + builder.add(DiscoveryNodeUtils.builder("search-" + i).roles(Set.of(DiscoveryNodeRole.SEARCH_ROLE)).build()); + } + return builder.localNodeId(indexNodeId).masterNodeId(indexNodeId).build(); + } + + private static IndexMetadata.Builder createIndex(int shards) { + return IndexMetadata.builder("index-" + randomAlphaOfLength(20)) + .settings(indexSettings(IndexVersion.current(), shards, 1).put(SETTING_CREATION_DATE, System.currentTimeMillis())); + } + +} diff --git a/server/src/test/java/org/elasticsearch/health/node/ShardsCapacityHealthIndicatorServiceTests.java b/server/src/test/java/org/elasticsearch/health/node/ShardsCapacityHealthIndicatorServiceTests.java index 1ab7ca5633e76..ed46f6a50cafa 100644 --- a/server/src/test/java/org/elasticsearch/health/node/ShardsCapacityHealthIndicatorServiceTests.java +++ b/server/src/test/java/org/elasticsearch/health/node/ShardsCapacityHealthIndicatorServiceTests.java @@ -351,7 +351,7 @@ public void testCalculateMethods() { Optional.empty(), randomInt(), randomInt(), - randomAlphaOfLength(5) + randomFrom(ShardLimitValidator.LimitGroup.values()) ); }; diff --git a/server/src/test/java/org/elasticsearch/indices/ShardLimitValidatorTests.java b/server/src/test/java/org/elasticsearch/indices/ShardLimitValidatorTests.java index 46bff920e8303..d2b1130d41eb7 100644 --- a/server/src/test/java/org/elasticsearch/indices/ShardLimitValidatorTests.java +++ b/server/src/test/java/org/elasticsearch/indices/ShardLimitValidatorTests.java @@ -25,40 +25,32 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.indices.ShardLimitValidator.LimitGroup; import org.elasticsearch.test.ESTestCase; import java.util.Set; +import java.util.stream.IntStream; import static org.elasticsearch.cluster.metadata.MetadataIndexStateServiceTests.addClosedIndex; import static org.elasticsearch.cluster.metadata.MetadataIndexStateServiceTests.addOpenedIndex; import static org.elasticsearch.cluster.shards.ShardCounts.forDataNodeCount; -import static org.elasticsearch.indices.ShardLimitValidator.FROZEN_GROUP; -import static org.elasticsearch.indices.ShardLimitValidator.NORMAL_GROUP; +import static org.elasticsearch.cluster.shards.ShardCounts.forIndexNodeCount; +import static org.elasticsearch.cluster.shards.ShardCounts.forSearchNodeCount; import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class ShardLimitValidatorTests extends ESTestCase { - @FunctionalInterface - interface CheckShardLimitMethod { - ShardLimitValidator.Result call( - int maxConfiguredShardsPerNode, - int numberOfNewShards, - int replicas, - DiscoveryNodes discoveryNodes, - Metadata metadata - ); - } - public void testOverShardLimit() { - testOverShardLimit(ShardLimitValidator::checkShardLimitForNormalNodes, NORMAL_GROUP); - testOverShardLimit(ShardLimitValidator::checkShardLimitForFrozenNodes, FROZEN_GROUP); + testOverShardLimit(LimitGroup.NORMAL, between(1, 90)); + testOverShardLimit(LimitGroup.FROZEN, between(1, 90)); + testOverShardLimit(LimitGroup.INDEX, between(1, 40)); + testOverShardLimit(LimitGroup.SEARCH, between(2, 40)); } - private void testOverShardLimit(CheckShardLimitMethod targetMethod, String group) { - int nodesInCluster = randomIntBetween(1, 90); - ShardCounts counts = forDataNodeCount(nodesInCluster); + private void testOverShardLimit(LimitGroup group, int nodesInCluster) { + final ShardCounts counts = computeShardCounts(group, nodesInCluster); ClusterState state = createClusterForShardLimitTest( nodesInCluster, counts.getFirstIndexShards(), @@ -66,7 +58,8 @@ private void testOverShardLimit(CheckShardLimitMethod targetMethod, String group counts.getShardsPerNode(), group ); - ShardLimitValidator.Result shardLimitsResult = targetMethod.call( + + ShardLimitValidator.Result shardLimitsResult = group.checkShardLimit( counts.getShardsPerNode(), counts.getFailingIndexShards(), counts.getFailingIndexReplicas(), @@ -74,8 +67,8 @@ private void testOverShardLimit(CheckShardLimitMethod targetMethod, String group state.metadata() ); - int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas()); - int currentOpenShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); + int totalShards = computeTotalNewShards(group, counts); + int currentOpenShards = computeCurrentOpenShards(group, counts); int maxShards = counts.getShardsPerNode() * nodesInCluster; assertFalse(shardLimitsResult.canAddShards()); @@ -100,14 +93,15 @@ private void testOverShardLimit(CheckShardLimitMethod targetMethod, String group } public void testUnderShardLimit() { - testUnderShardLimit(ShardLimitValidator::checkShardLimitForNormalNodes, NORMAL_GROUP); - testUnderShardLimit(ShardLimitValidator::checkShardLimitForFrozenNodes, FROZEN_GROUP); + testUnderShardLimit(LimitGroup.NORMAL, between(10, 90)); + testUnderShardLimit(LimitGroup.FROZEN, between(10, 90)); + testUnderShardLimit(LimitGroup.INDEX, between(10, 40)); + testUnderShardLimit(LimitGroup.SEARCH, between(10, 40)); } - private void testUnderShardLimit(CheckShardLimitMethod targetMethod, String group) { - int nodesInCluster = randomIntBetween(10, 90); + private void testUnderShardLimit(LimitGroup group, int nodesInCluster) { // Calculate the counts for a cluster with maximum of 60% of occupancy - ShardCounts counts = forDataNodeCount((int) (nodesInCluster * 0.6)); + ShardCounts counts = computeShardCounts(group, (int) (nodesInCluster * 0.6)); ClusterState state = createClusterForShardLimitTest( nodesInCluster, counts.getFirstIndexShards(), @@ -116,29 +110,33 @@ private void testUnderShardLimit(CheckShardLimitMethod targetMethod, String grou group ); - int replicas = randomIntBetween(0, 3); - int maxShardsInCluster = counts.getShardsPerNode() * nodesInCluster; - int existingShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); - int availableRoom = maxShardsInCluster - existingShards; - int shardsToAdd = randomIntBetween(1, Math.max(availableRoom / (replicas + 1), 1)); - ShardLimitValidator.Result shardLimitsResult = targetMethod.call( + final var underLimitShardCounts = computeUnderLimitShardCounts(group, counts, nodesInCluster); + + ShardLimitValidator.Result shardLimitsResult = group.checkShardLimit( counts.getShardsPerNode(), - shardsToAdd, - replicas, + underLimitShardCounts.shards(), + underLimitShardCounts.replicas(), state.nodes(), state.metadata() ); + logger.info("--> underLimitShardCounts: " + underLimitShardCounts); + assertTrue(shardLimitsResult.canAddShards()); assertEquals(shardLimitsResult.maxShardsInCluster(), counts.getShardsPerNode() * nodesInCluster); - assertEquals(shardLimitsResult.totalShardsToAdd(), shardsToAdd * (replicas + 1)); + assertEquals(shardLimitsResult.totalShardsToAdd(), underLimitShardCounts.totalNewShards()); assertFalse(shardLimitsResult.currentUsedShards().isPresent()); assertEquals(shardLimitsResult.group(), group); } public void testValidateShardLimitOpenIndices() { - int nodesInCluster = randomIntBetween(2, 90); - ShardCounts counts = forDataNodeCount(nodesInCluster); - final String group = randomFrom(ShardLimitValidator.VALID_GROUPS); + doTestValidateShardLimitOpenIndices(LimitGroup.NORMAL, between(2, 90)); + doTestValidateShardLimitOpenIndices(LimitGroup.FROZEN, between(2, 90)); + doTestValidateShardLimitOpenIndices(LimitGroup.INDEX, between(2, 40)); + doTestValidateShardLimitOpenIndices(LimitGroup.SEARCH, between(2, 40)); + } + + private void doTestValidateShardLimitOpenIndices(LimitGroup group, int nodesInCluster) { + ShardCounts counts = computeShardCounts(group, nodesInCluster); final ClusterState state = createClusterForShardLimitTest( nodesInCluster, counts.getFirstIndexShards(), @@ -150,8 +148,6 @@ public void testValidateShardLimitOpenIndices() { Index[] indices = getIndices(state); - int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas()); - int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); int maxShards = counts.getShardsPerNode() * nodesInCluster; ShardLimitValidator shardLimitValidator = createTestShardLimitService(counts.getShardsPerNode(), group); ValidationException exception = expectThrows( @@ -160,9 +156,9 @@ public void testValidateShardLimitOpenIndices() { ); assertEquals( "Validation Failed: 1: this action would add [" - + totalShards + + computeTotalNewShards(group, counts) + "] shards, but this cluster currently has [" - + currentShards + + computeCurrentOpenShards(group, counts) + "]/[" + maxShards + "] maximum " @@ -175,24 +171,39 @@ public void testValidateShardLimitOpenIndices() { } public void testValidateShardLimitUpdateReplicas() { + doTestValidateShardLimitUpdateReplicas(LimitGroup.NORMAL); + doTestValidateShardLimitUpdateReplicas(LimitGroup.FROZEN); + doTestValidateShardLimitUpdateReplicas(LimitGroup.SEARCH); + } + + public void doTestValidateShardLimitUpdateReplicas(LimitGroup group) { final int nodesInCluster = randomIntBetween(2, 90); final int shardsPerNode = randomIntBetween(1, 10); - final String group = randomFrom(ShardLimitValidator.VALID_GROUPS); - ClusterState state = createClusterStateForReplicaUpdate(nodesInCluster, shardsPerNode, group); + int originalReplicas = nodesInCluster - 2; + ClusterState state = createClusterStateForReplicaUpdate(nodesInCluster, shardsPerNode, originalReplicas, group); final Index[] indices = getIndices(state); final ShardLimitValidator shardLimitValidator = createTestShardLimitService(shardsPerNode, group); - shardLimitValidator.validateShardLimitOnReplicaUpdate(state.nodes(), state.metadata(), indices, nodesInCluster - 1); + final int updatedValidReplicas = switch (group) { + case NORMAL, FROZEN -> nodesInCluster - 1; + case INDEX, SEARCH -> between(nodesInCluster - 1, nodesInCluster); + }; + shardLimitValidator.validateShardLimitOnReplicaUpdate(state.nodes(), state.metadata(), indices, updatedValidReplicas); + + final int updatedInvalidReplicas = switch (group) { + case NORMAL, FROZEN -> nodesInCluster; + case INDEX, SEARCH -> nodesInCluster + 1; + }; ValidationException exception = expectThrows( ValidationException.class, - () -> shardLimitValidator.validateShardLimitOnReplicaUpdate(state.nodes(), state.metadata(), indices, nodesInCluster) + () -> shardLimitValidator.validateShardLimitOnReplicaUpdate(state.nodes(), state.metadata(), indices, updatedInvalidReplicas) ); assertEquals( "Validation Failed: 1: this action would add [" - + (shardsPerNode * 2) + + (shardsPerNode * (updatedInvalidReplicas - originalReplicas)) + "] shards, but this cluster currently has [" - + (shardsPerNode * (nodesInCluster - 1)) + + computeTotalShards(group, shardsPerNode, originalReplicas) + "]/[" + shardsPerNode * nodesInCluster + "] maximum " @@ -204,15 +215,70 @@ public void testValidateShardLimitUpdateReplicas() { ); } + private ShardCounts computeShardCounts(LimitGroup group, int nodesInCluster) { + final ShardCounts shardCounts = switch (group) { + case NORMAL, FROZEN -> forDataNodeCount(nodesInCluster); + case INDEX -> forIndexNodeCount(nodesInCluster); + case SEARCH -> forSearchNodeCount(nodesInCluster); + }; + logger.info("--> group [{}], nodesInCluster: [{}] shardCounts: [{}]", group, nodesInCluster, shardCounts); + return shardCounts; + } + + private static int computeTotalNewShards(LimitGroup group, ShardCounts counts) { + return computeTotalShards(group, counts.getFailingIndexShards(), counts.getFailingIndexReplicas()); + } + + private static int computeCurrentOpenShards(LimitGroup group, ShardCounts counts) { + return computeTotalShards(group, counts.getFirstIndexShards(), counts.getFirstIndexReplicas()); + } + + private static int computeTotalShards(LimitGroup group, int shards, int replicas) { + return switch (group) { + case NORMAL, FROZEN -> shards * (1 + replicas); + case INDEX -> shards; + case SEARCH -> shards * replicas; + }; + } + + private record UnderLimitShardCounts(int shards, int replicas, int totalNewShards) {} + + private static UnderLimitShardCounts computeUnderLimitShardCounts(LimitGroup group, ShardCounts counts, int nodesInCluster) { + int maxShardsInCluster = counts.getShardsPerNode() * nodesInCluster; + return switch (group) { + case NORMAL, FROZEN -> { + int existingShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); + int availableRoom = maxShardsInCluster - existingShards; + int replicas = randomIntBetween(0, 3); + int shards = randomIntBetween(1, Math.max(availableRoom / (replicas + 1), 1)); + yield new UnderLimitShardCounts(shards, replicas, shards * (replicas + 1)); + } + case INDEX -> { + int existingShards = counts.getFirstIndexShards(); + int availableRoom = maxShardsInCluster - existingShards; + int replicas = randomIntBetween(0, 1); + int shards = randomIntBetween(1, availableRoom); + yield new UnderLimitShardCounts(shards, replicas, shards); + } + case SEARCH -> { + int existingShards = counts.getFirstIndexShards() * counts.getFirstIndexReplicas(); + int availableRoom = maxShardsInCluster - existingShards; + int replicas = randomIntBetween(1, nodesInCluster); + int shards = randomIntBetween(1, Math.max(availableRoom / replicas, 1)); + yield new UnderLimitShardCounts(shards, replicas, shards * replicas); + } + }; + } + public Index[] getIndices(ClusterState state) { return state.metadata().getProject().indices().values().stream().map(IndexMetadata::getIndex).toList().toArray(Index.EMPTY_ARRAY); } - private ClusterState createClusterStateForReplicaUpdate(int nodesInCluster, int shardsPerNode, String group) { + private ClusterState createClusterStateForReplicaUpdate(int nodesInCluster, int shardsPerNode, int replicas, LimitGroup group) { DiscoveryNodes nodes = createDiscoveryNodes(nodesInCluster, group); ClusterState state = ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build(); - state = addOpenedIndex(Metadata.DEFAULT_PROJECT_ID, randomAlphaOfLengthBetween(5, 15), shardsPerNode, nodesInCluster - 2, state); - if (group.equals(ShardLimitValidator.FROZEN_GROUP)) { + state = addOpenedIndex(Metadata.DEFAULT_PROJECT_ID, randomAlphaOfLengthBetween(5, 15), shardsPerNode, replicas, state); + if (group == LimitGroup.FROZEN) { state = ClusterState.builder(state).metadata(freezeMetadata(Metadata.builder(state.metadata()), state.metadata())).build(); } return state; @@ -223,13 +289,13 @@ public static ClusterState createClusterForShardLimitTest( int shardsInIndex, int replicas, int maxShardsPerNode, - String group + LimitGroup group ) { DiscoveryNodes nodes = createDiscoveryNodes(nodesInCluster, group); Settings.Builder settings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()); - if (ShardLimitValidator.FROZEN_GROUP.equals(group) || randomBoolean()) { - settings.put(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), group); + if (LimitGroup.FROZEN == group || randomBoolean()) { + settings.put(ShardLimitValidator.INDEX_SETTING_SHARD_LIMIT_GROUP.getKey(), group.groupName()); } IndexMetadata.Builder indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 15)) .settings(settings) @@ -257,7 +323,7 @@ public static ClusterState createClusterForShardLimitTest( int openIndexReplicas, int closedIndexShards, int closedIndexReplicas, - String group + LimitGroup group ) { DiscoveryNodes nodes = createDiscoveryNodes(nodesInCluster, group); @@ -277,20 +343,19 @@ public static ClusterState createClusterForShardLimitTest( } else { metadata.transientSettings(Settings.EMPTY); } - if (ShardLimitValidator.FROZEN_GROUP.equals(group)) { + if (LimitGroup.FROZEN == group) { freezeMetadata(metadata, state.metadata()); } return ClusterState.builder(state).metadata(metadata).nodes(nodes).build(); } - public static DiscoveryNodes createDiscoveryNodes(int nodesInCluster, String group) { + public static DiscoveryNodes createDiscoveryNodes(int nodesInCluster, LimitGroup group) { DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); for (int i = 0; i < nodesInCluster; i++) { Set roles; - if (ShardLimitValidator.FROZEN_GROUP.equals(group)) { - roles = randomBoolean() ? DiscoveryNodeRole.roles() : Set.of(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE); - } else { - roles = randomBoolean() + roles = switch (group) { + case FROZEN -> randomBoolean() ? DiscoveryNodeRole.roles() : Set.of(DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE); + case NORMAL -> randomBoolean() ? DiscoveryNodeRole.roles() : Set.of( randomFrom( @@ -300,10 +365,30 @@ public static DiscoveryNodes createDiscoveryNodes(int nodesInCluster, String gro DiscoveryNodeRole.DATA_COLD_NODE_ROLE ) ); - } - + case INDEX -> Set.of(DiscoveryNodeRole.INDEX_ROLE); + case SEARCH -> Set.of(DiscoveryNodeRole.SEARCH_ROLE); + }; builder.add(DiscoveryNodeUtils.builder(randomAlphaOfLengthBetween(5, 15)).roles(roles).build()); } + + if (group == LimitGroup.INDEX) { + // Also add search nodes for index limit group and they should not affect the result of the index group + IntStream.range(0, nodesInCluster + 1) + .forEach( + i -> builder.add( + DiscoveryNodeUtils.builder(randomAlphaOfLength(16) + i).roles(Set.of(DiscoveryNodeRole.SEARCH_ROLE)).build() + ) + ); + } else if (group == LimitGroup.SEARCH) { + // Also add index nodes for search limit group and they should not affect the result of the search group + IntStream.range(0, nodesInCluster + 1) + .forEach( + i -> builder.add( + DiscoveryNodeUtils.builder(randomAlphaOfLength(16) + i).roles(Set.of(DiscoveryNodeRole.INDEX_ROLE)).build() + ) + ); + } + return builder.build(); } @@ -324,14 +409,19 @@ private static Metadata.Builder freezeMetadata(Metadata.Builder builder, Metadat return builder; } - public static ShardLimitValidator createTestShardLimitService(int maxShardsPerNode, String group) { - Setting setting = ShardLimitValidator.FROZEN_GROUP.equals(group) + public static ShardLimitValidator createTestShardLimitService(int maxShardsPerNode, LimitGroup group) { + Setting setting = LimitGroup.FROZEN == group ? ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN : SETTING_CLUSTER_MAX_SHARDS_PER_NODE; // Use a mocked clusterService - for unit tests we won't be updating the setting anyway. ClusterService clusterService = mock(ClusterService.class); - Settings limitOnlySettings = Settings.builder().put(setting.getKey(), maxShardsPerNode).build(); + + final Settings.Builder settingsBuilder = Settings.builder(); + if (group == LimitGroup.INDEX || group == LimitGroup.SEARCH) { + settingsBuilder.put("stateless.enabled", true); + } + Settings limitOnlySettings = settingsBuilder.put(setting.getKey(), maxShardsPerNode).build(); when(clusterService.getClusterSettings()).thenReturn( new ClusterSettings(limitOnlySettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) );