Skip to content

Commit 18f38f9

Browse files
authored
Separate shard limit validation for index and search tiers (#136063)
In stateless, index and search shards are distinct and must be allocated to nodes of corresponding types. Therefore the shard limit validation should be performed for them separately to avoid one shard type taking more quota than expected, similar to the separation between regular and frozen shards. Resolves: ES-12884
1 parent f5689af commit 18f38f9

File tree

7 files changed

+768
-253
lines changed

7 files changed

+768
-253
lines changed

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LazyRolloverDataStreamIT.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.List;
2727
import java.util.Map;
2828

29+
import static org.hamcrest.Matchers.allOf;
30+
import static org.hamcrest.Matchers.anyOf;
2931
import static org.hamcrest.Matchers.containsString;
3032
import static org.hamcrest.Matchers.endsWith;
3133
import static org.hamcrest.Matchers.equalTo;
@@ -194,7 +196,13 @@ public void testLazyRolloverFailsIndexing() throws Exception {
194196
simpleUserClient.performRequest(createDocRequest);
195197
fail("Indexing should have failed.");
196198
} catch (ResponseException responseException) {
197-
assertThat(responseException.getMessage(), containsString("this action would add [2] shards"));
199+
assertThat(
200+
responseException.getMessage(),
201+
anyOf(
202+
allOf(containsString("this action would add [2] shards"), containsString("maximum normal shards")),
203+
allOf(containsString("this action would add [1] shards"), containsString("maximum index shards"))
204+
)
205+
);
198206
}
199207

200208
updateClusterSettingsRequest = new Request("PUT", "_cluster/settings");

server/src/main/java/org/elasticsearch/health/node/ShardsCapacityHealthIndicatorService.java

Lines changed: 57 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.health.node;
1111

1212
import org.elasticsearch.cluster.metadata.Metadata;
13+
import org.elasticsearch.cluster.node.DiscoveryNode;
1314
import org.elasticsearch.cluster.node.DiscoveryNodes;
1415
import org.elasticsearch.cluster.service.ClusterService;
1516
import org.elasticsearch.common.ReferenceDocs;
@@ -25,8 +26,9 @@
2526
import org.elasticsearch.health.metadata.HealthMetadata;
2627
import org.elasticsearch.indices.ShardLimitValidator;
2728

29+
import java.util.ArrayList;
30+
import java.util.LinkedHashSet;
2831
import java.util.List;
29-
import java.util.stream.Stream;
3032

3133
/**
3234
* This indicator reports health data about the shard capacity across the cluster.
@@ -45,8 +47,6 @@ public class ShardsCapacityHealthIndicatorService implements HealthIndicatorServ
4547

4648
static final String NAME = "shards_capacity";
4749

48-
static final String DATA_NODE_NAME = "data";
49-
static final String FROZEN_NODE_NAME = "frozen";
5050
private static final String UPGRADE_BLOCKED = "The cluster has too many used shards to be able to upgrade.";
5151
private static final String UPGRADE_AT_RISK =
5252
"The cluster is running low on room to add new shard. Upgrading to a new version is at risk.";
@@ -90,9 +90,11 @@ public class ShardsCapacityHealthIndicatorService implements HealthIndicatorServ
9090
);
9191

9292
private final ClusterService clusterService;
93+
private final List<ShardLimitValidator.LimitGroup> shardLimitGroups;
9394

9495
public ShardsCapacityHealthIndicatorService(ClusterService clusterService) {
9596
this.clusterService = clusterService;
97+
this.shardLimitGroups = ShardLimitValidator.applicableLimitGroups(DiscoveryNode.isStateless(clusterService.getSettings()));
9698
}
9799

98100
@Override
@@ -109,26 +111,23 @@ public HealthIndicatorResult calculate(boolean verbose, int maxAffectedResources
109111
}
110112

111113
var shardLimitsMetadata = healthMetadata.getShardLimitsMetadata();
112-
return mergeIndicators(
113-
verbose,
114-
calculateFrom(
115-
shardLimitsMetadata.maxShardsPerNode(),
116-
state.nodes(),
117-
state.metadata(),
118-
ShardLimitValidator::checkShardLimitForNormalNodes
119-
),
120-
calculateFrom(
121-
shardLimitsMetadata.maxShardsPerNodeFrozen(),
122-
state.nodes(),
123-
state.metadata(),
124-
ShardLimitValidator::checkShardLimitForFrozenNodes
114+
final List<StatusResult> statusResults = shardLimitGroups.stream()
115+
.map(
116+
limitGroup -> calculateFrom(
117+
ShardLimitValidator.getShardLimitPerNode(limitGroup, shardLimitsMetadata),
118+
state.nodes(),
119+
state.metadata(),
120+
limitGroup::checkShardLimit
121+
)
125122
)
126-
);
123+
.toList();
124+
125+
return mergeIndicators(verbose, statusResults);
127126
}
128127

129-
private HealthIndicatorResult mergeIndicators(boolean verbose, StatusResult dataNodes, StatusResult frozenNodes) {
130-
var finalStatus = HealthStatus.merge(Stream.of(dataNodes.status, frozenNodes.status));
131-
var diagnoses = List.<Diagnosis>of();
128+
private HealthIndicatorResult mergeIndicators(boolean verbose, List<StatusResult> statusResults) {
129+
var finalStatus = HealthStatus.merge(statusResults.stream().map(StatusResult::status));
130+
var diagnoses = new LinkedHashSet<Diagnosis>();
132131
var symptomBuilder = new StringBuilder();
133132

134133
if (finalStatus == HealthStatus.GREEN) {
@@ -139,19 +138,22 @@ private HealthIndicatorResult mergeIndicators(boolean verbose, StatusResult data
139138
// frozen* nodes, so we have to check each of the groups in order of provide the right message.
140139
if (finalStatus.indicatesHealthProblem()) {
141140
symptomBuilder.append("Cluster is close to reaching the configured maximum number of shards for ");
142-
if (dataNodes.status == frozenNodes.status) {
143-
symptomBuilder.append(DATA_NODE_NAME).append(" and ").append(FROZEN_NODE_NAME);
144-
diagnoses = List.of(SHARDS_MAX_CAPACITY_REACHED_DATA_NODES, SHARDS_MAX_CAPACITY_REACHED_FROZEN_NODES);
145-
146-
} else if (dataNodes.status.indicatesHealthProblem()) {
147-
symptomBuilder.append(DATA_NODE_NAME);
148-
diagnoses = List.of(SHARDS_MAX_CAPACITY_REACHED_DATA_NODES);
149-
150-
} else if (frozenNodes.status.indicatesHealthProblem()) {
151-
symptomBuilder.append(FROZEN_NODE_NAME);
152-
diagnoses = List.of(SHARDS_MAX_CAPACITY_REACHED_FROZEN_NODES);
141+
final var nodeTypeNames = new ArrayList<String>();
142+
for (var statusResult : statusResults) {
143+
if (statusResult.status.indicatesHealthProblem()) {
144+
nodeTypeNames.add(nodeTypeFroLimitGroup(statusResult.result.group()));
145+
diagnoses.add(diagnosisForLimitGroup(statusResult.result.group()));
146+
}
153147
}
154148

149+
assert nodeTypeNames.isEmpty() == false;
150+
symptomBuilder.append(nodeTypeNames.getFirst());
151+
for (int i = 1; i < nodeTypeNames.size() - 1; i++) {
152+
symptomBuilder.append(", ").append(nodeTypeNames.get(i));
153+
}
154+
if (nodeTypeNames.size() > 1) {
155+
symptomBuilder.append(" and ").append(nodeTypeNames.getLast());
156+
}
155157
symptomBuilder.append(" nodes.");
156158
}
157159

@@ -164,9 +166,9 @@ private HealthIndicatorResult mergeIndicators(boolean verbose, StatusResult data
164166
return createIndicator(
165167
finalStatus,
166168
symptomBuilder.toString(),
167-
verbose ? buildDetails(dataNodes.result, frozenNodes.result) : HealthIndicatorDetails.EMPTY,
169+
verbose ? buildDetails(statusResults.stream().map(StatusResult::result).toList()) : HealthIndicatorDetails.EMPTY,
168170
indicatorImpacts,
169-
verbose ? diagnoses : List.of()
171+
verbose ? List.copyOf(diagnoses) : List.of()
170172
);
171173
}
172174

@@ -189,22 +191,14 @@ static StatusResult calculateFrom(
189191
return new StatusResult(HealthStatus.GREEN, result);
190192
}
191193

192-
static HealthIndicatorDetails buildDetails(ShardLimitValidator.Result dataNodes, ShardLimitValidator.Result frozenNodes) {
194+
static HealthIndicatorDetails buildDetails(List<ShardLimitValidator.Result> results) {
193195
return (builder, params) -> {
194196
builder.startObject();
195-
{
196-
builder.startObject(DATA_NODE_NAME);
197-
builder.field("max_shards_in_cluster", dataNodes.maxShardsInCluster());
198-
if (dataNodes.currentUsedShards().isPresent()) {
199-
builder.field("current_used_shards", dataNodes.currentUsedShards().get());
200-
}
201-
builder.endObject();
202-
}
203-
{
204-
builder.startObject("frozen");
205-
builder.field("max_shards_in_cluster", frozenNodes.maxShardsInCluster());
206-
if (frozenNodes.currentUsedShards().isPresent()) {
207-
builder.field("current_used_shards", frozenNodes.currentUsedShards().get());
197+
for (var result : results) {
198+
builder.startObject(nodeTypeFroLimitGroup(result.group()));
199+
builder.field("max_shards_in_cluster", result.maxShardsInCluster());
200+
if (result.currentUsedShards().isPresent()) {
201+
builder.field("current_used_shards", result.currentUsedShards().get());
208202
}
209203
builder.endObject();
210204
}
@@ -223,6 +217,22 @@ private HealthIndicatorResult unknownIndicator() {
223217
);
224218
}
225219

220+
private static String nodeTypeFroLimitGroup(ShardLimitValidator.LimitGroup limitGroup) {
221+
return switch (limitGroup) {
222+
case NORMAL -> "data";
223+
case FROZEN -> "frozen";
224+
case INDEX -> "index";
225+
case SEARCH -> "search";
226+
};
227+
}
228+
229+
private static Diagnosis diagnosisForLimitGroup(ShardLimitValidator.LimitGroup limitGroup) {
230+
return switch (limitGroup) {
231+
case NORMAL, INDEX, SEARCH -> SHARDS_MAX_CAPACITY_REACHED_DATA_NODES;
232+
case FROZEN -> SHARDS_MAX_CAPACITY_REACHED_FROZEN_NODES;
233+
};
234+
}
235+
226236
record StatusResult(HealthStatus status, ShardLimitValidator.Result result) {}
227237

228238
@FunctionalInterface

0 commit comments

Comments
 (0)