Skip to content

Commit 9fce22d

Browse files
authored
Add delayed allocation diagnosis case to shards availability indicator (#89056) (#90018)
This PR adds diagnosis logic to the shards availability health indicator that detects when a shard allocation is delayed. This usually happens when a node that a shard is allocated to disappears. It is often better to delay the recovery of a shard in case the node that hosts it comes back. Shards that are delayed in this manner have special flags set on their unassigned info that denote a delayed allocation. This change adds a diagnosis to the indicator that identifies these delayed shards and provides guidance stating that they will eventually allocate on their own once the delay elapses, but if allocation is required immediately, an index setting can be updated to perform the allocation. This PR also includes some light integration testing to ensure that more unassigned cases are covered by the indicator.
1 parent 04d6a06 commit 9fce22d

File tree

4 files changed

+224
-3
lines changed

4 files changed

+224
-3
lines changed

docs/changelog/89056.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 89056
2+
summary: Add delayed allocation diagnosis case to shards availability indicator
3+
area: Health
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAvailabilityHealthIndicatorService.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,16 @@ public HealthIndicatorResult calculate(boolean explain) {
147147
DIAGNOSE_SHARDS_ACTION_GUIDE
148148
);
149149

150+
public static final String FIX_DELAYED_SHARDS_GUIDE = "http://ela.st/fix-delayed-shard-allocation";
151+
public static final Diagnosis.Definition DIAGNOSIS_WAIT_FOR_OR_FIX_DELAYED_SHARDS = new Diagnosis.Definition(
152+
"delayed_shard_allocations",
153+
"Elasticsearch is not allocating some shards because they are marked for delayed allocation. Shards that have become "
154+
+ "unavailable are usually marked for delayed allocation because it is more efficient to wait and see if the shards return "
155+
+ "on their own than to recover the shard immediately.",
156+
"Elasticsearch will reallocate the shards when the delay has elapsed. No action is required by the user.",
157+
FIX_DELAYED_SHARDS_GUIDE
158+
);
159+
150160
public static final String ENABLE_INDEX_ALLOCATION_GUIDE = "http://ela.st/fix-index-allocation";
151161
public static final Diagnosis.Definition ACTION_ENABLE_INDEX_ROUTING_ALLOCATION = new Diagnosis.Definition(
152162
"enable_index_allocations",
@@ -413,10 +423,18 @@ List<Diagnosis.Definition> diagnoseUnassignedShardRouting(ShardRouting shardRout
413423
actions.add(ACTION_RESTORE_FROM_SNAPSHOT);
414424
}
415425
break;
426+
case NO_ATTEMPT:
427+
if (shardRouting.unassignedInfo().isDelayed()) {
428+
actions.add(DIAGNOSIS_WAIT_FOR_OR_FIX_DELAYED_SHARDS);
429+
} else {
430+
actions.addAll(explainAllocationsAndDiagnoseDeciders(shardRouting, state));
431+
}
432+
break;
416433
case DECIDERS_NO:
417434
actions.addAll(explainAllocationsAndDiagnoseDeciders(shardRouting, state));
418435
break;
419-
default:
436+
case DELAYED_ALLOCATION:
437+
actions.add(DIAGNOSIS_WAIT_FOR_OR_FIX_DELAYED_SHARDS);
420438
break;
421439
}
422440
if (actions.isEmpty()) {

server/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardsAvailabilityHealthIndicatorServiceTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService.ACTION_MIGRATE_TIERS_AWAY_FROM_INCLUDE_DATA_LOOKUP;
7474
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService.ACTION_MIGRATE_TIERS_AWAY_FROM_REQUIRE_DATA_LOOKUP;
7575
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService.ACTION_RESTORE_FROM_SNAPSHOT;
76+
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService.DIAGNOSIS_WAIT_FOR_OR_FIX_DELAYED_SHARDS;
7677
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService.NAME;
7778
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorServiceTests.ShardState.AVAILABLE;
7879
import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorServiceTests.ShardState.INITIALIZING;
@@ -386,7 +387,7 @@ public void testShouldBeYellowWhenRestartingReplicasReachedAllocationDelay() {
386387
List.of(ImpactArea.SEARCH)
387388
)
388389
),
389-
List.of(new Diagnosis(ACTION_CHECK_ALLOCATION_EXPLAIN_API, List.of("restarting-index")))
390+
List.of(new Diagnosis(DIAGNOSIS_WAIT_FOR_OR_FIX_DELAYED_SHARDS, List.of("restarting-index")))
390391
)
391392
)
392393
);
@@ -460,7 +461,7 @@ public void testShouldBeRedWhenRestartingPrimariesReachedAllocationDelayAndNoRep
460461
List.of(ImpactArea.INGEST, ImpactArea.SEARCH)
461462
)
462463
),
463-
List.of(new Diagnosis(ACTION_CHECK_ALLOCATION_EXPLAIN_API, List.of("restarting-index")))
464+
List.of(new Diagnosis(DIAGNOSIS_WAIT_FOR_OR_FIX_DELAYED_SHARDS, List.of("restarting-index")))
464465
)
465466
)
466467
);
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.cluster.routing.allocation;
9+
10+
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation;
11+
import org.elasticsearch.action.index.IndexRequestBuilder;
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
15+
import org.elasticsearch.cluster.routing.RoutingNodesHelper;
16+
import org.elasticsearch.cluster.routing.ShardRouting;
17+
import org.elasticsearch.cluster.routing.ShardRoutingState;
18+
import org.elasticsearch.cluster.routing.UnassignedInfo;
19+
import org.elasticsearch.cluster.routing.allocation.DataTier;
20+
import org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService;
21+
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.common.xcontent.XContentHelper;
23+
import org.elasticsearch.core.TimeValue;
24+
import org.elasticsearch.health.Diagnosis;
25+
import org.elasticsearch.health.GetHealthAction;
26+
import org.elasticsearch.health.HealthIndicatorResult;
27+
import org.elasticsearch.health.HealthStatus;
28+
import org.elasticsearch.plugins.Plugin;
29+
import org.elasticsearch.test.ESIntegTestCase;
30+
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
31+
import org.elasticsearch.xcontent.XContentType;
32+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
33+
34+
import java.util.Collection;
35+
import java.util.Collections;
36+
import java.util.List;
37+
import java.util.stream.Collectors;
38+
39+
import static org.elasticsearch.test.NodeRoles.onlyRole;
40+
import static org.hamcrest.Matchers.equalTo;
41+
import static org.hamcrest.Matchers.hasItem;
42+
43+
/**
44+
* Contains all integration tests for the {@link org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService}
45+
* that require the data tiers allocation decider logic.
46+
*/
47+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
48+
public class DataTierShardAvailabilityHealthIndicatorIT extends ESIntegTestCase {
49+
50+
@Override
51+
protected Collection<Class<? extends Plugin>> nodePlugins() {
52+
return List.of(LocalStateCompositeXPackPlugin.class);
53+
}
54+
55+
/**
56+
* Verify that the health API returns an "increase tier capacity" diagnosis when an index is created but there aren't enough nodes in
57+
* a tier to host the desired replicas on unique nodes.
58+
*/
59+
public void testIncreaseTierCapacityDiagnosisWhenCreated() throws Exception {
60+
internalCluster().startMasterOnlyNodes(1);
61+
internalCluster().startNodes(1, onlyRole(DiscoveryNodeRole.DATA_HOT_NODE_ROLE));
62+
ElasticsearchAssertions.assertAcked(
63+
prepareCreate("test").setSettings(
64+
Settings.builder()
65+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
66+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
67+
.put(DataTier.TIER_PREFERENCE, DataTier.DATA_HOT)
68+
)
69+
);
70+
ensureYellow("test");
71+
GetHealthAction.Response healthResponse = client().execute(
72+
GetHealthAction.INSTANCE,
73+
new GetHealthAction.Request(ShardsAvailabilityHealthIndicatorService.NAME, true)
74+
).get();
75+
HealthIndicatorResult indicatorResult = healthResponse.findIndicator(ShardsAvailabilityHealthIndicatorService.NAME);
76+
assertThat(indicatorResult.status(), equalTo(HealthStatus.YELLOW));
77+
assertThat(
78+
indicatorResult.diagnosisList(),
79+
hasItem(
80+
new Diagnosis(
81+
ShardsAvailabilityHealthIndicatorService.ACTION_INCREASE_TIER_CAPACITY_LOOKUP.get(DataTier.DATA_HOT),
82+
List.of("test")
83+
)
84+
)
85+
);
86+
}
87+
88+
/**
89+
* Verify that the health API returns an "increase tier capacity" diagnosis when enough nodes in a tier leave such that the tier cannot
90+
* host all of an index's replicas on unique nodes.
91+
*/
92+
public void testIncreaseTierCapacityDiagnosisWhenTierShrinksUnexpectedly() throws Exception {
93+
internalCluster().startMasterOnlyNodes(1);
94+
internalCluster().startNodes(2, onlyRole(DiscoveryNodeRole.DATA_HOT_NODE_ROLE));
95+
ElasticsearchAssertions.assertAcked(
96+
prepareCreate("test").setSettings(
97+
Settings.builder()
98+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
99+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
100+
.put(DataTier.TIER_PREFERENCE, DataTier.DATA_HOT)
101+
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0)
102+
)
103+
);
104+
ensureGreen("test");
105+
indexRandomData("test");
106+
internalCluster().stopNode(findNodeWithReplicaShard("test", 0));
107+
ensureYellow("test");
108+
GetHealthAction.Response healthResponse = client().execute(
109+
GetHealthAction.INSTANCE,
110+
new GetHealthAction.Request(ShardsAvailabilityHealthIndicatorService.NAME, true)
111+
).get();
112+
ClusterAllocationExplanation explain = client().admin()
113+
.cluster()
114+
.prepareAllocationExplain()
115+
.setIndex("test")
116+
.setShard(0)
117+
.setPrimary(false)
118+
.get()
119+
.getExplanation();
120+
logger.info(XContentHelper.toXContent(explain, XContentType.JSON, true).utf8ToString());
121+
HealthIndicatorResult indicatorResult = healthResponse.findIndicator(ShardsAvailabilityHealthIndicatorService.NAME);
122+
assertThat(indicatorResult.status(), equalTo(HealthStatus.YELLOW));
123+
assertThat(
124+
indicatorResult.diagnosisList(),
125+
hasItem(
126+
new Diagnosis(
127+
ShardsAvailabilityHealthIndicatorService.ACTION_INCREASE_TIER_CAPACITY_LOOKUP.get(DataTier.DATA_HOT),
128+
List.of("test")
129+
)
130+
)
131+
);
132+
}
133+
134+
/**
135+
* Verify that the health API returns a "YELLOW" status when a node disappears and a shard is unassigned because it is delayed.
136+
*/
137+
public void testRemovingNodeReturnsYellowForDelayedIndex() throws Exception {
138+
internalCluster().startMasterOnlyNodes(1);
139+
internalCluster().startNodes(3, onlyRole(DiscoveryNodeRole.DATA_HOT_NODE_ROLE));
140+
ElasticsearchAssertions.assertAcked(
141+
prepareCreate("test").setSettings(
142+
Settings.builder()
143+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
144+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
145+
.put(DataTier.TIER_PREFERENCE, DataTier.DATA_HOT)
146+
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMinutes(30))
147+
)
148+
);
149+
ensureGreen("test");
150+
indexRandomData("test");
151+
internalCluster().stopNode(findNodeWithPrimaryShard("test", 0));
152+
ensureYellow("test");
153+
GetHealthAction.Response healthResponse = client().execute(
154+
GetHealthAction.INSTANCE,
155+
new GetHealthAction.Request(ShardsAvailabilityHealthIndicatorService.NAME, true)
156+
).get();
157+
HealthIndicatorResult indicatorResult = healthResponse.findIndicator(ShardsAvailabilityHealthIndicatorService.NAME);
158+
assertThat(indicatorResult.status(), equalTo(HealthStatus.YELLOW));
159+
assertThat(indicatorResult.diagnosisList().size(), equalTo(1));
160+
assertThat(
161+
indicatorResult.diagnosisList(),
162+
hasItem(new Diagnosis(ShardsAvailabilityHealthIndicatorService.DIAGNOSIS_WAIT_FOR_OR_FIX_DELAYED_SHARDS, List.of("test")))
163+
);
164+
}
165+
166+
private void indexRandomData(String indexName) throws Exception {
167+
int numDocs = scaledRandomIntBetween(100, 1000);
168+
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
169+
for (int i = 0; i < builders.length; i++) {
170+
builders[i] = client().prepareIndex(indexName).setSource("field", "value");
171+
}
172+
// we want to test both full divergent copies of the shard in terms of segments, and
173+
// a case where they are the same (using sync flush), index Random does all this goodness
174+
// already
175+
indexRandom(true, builders);
176+
}
177+
178+
private String findNodeWithPrimaryShard(String indexName, int shard) {
179+
return findNodeWithShard(indexName, shard, true);
180+
}
181+
182+
private String findNodeWithReplicaShard(String indexName, int shard) {
183+
return findNodeWithShard(indexName, shard, false);
184+
}
185+
186+
private String findNodeWithShard(final String indexName, final int shard, final boolean primary) {
187+
ClusterState state = client().admin().cluster().prepareState().get().getState();
188+
List<ShardRouting> startedShards = RoutingNodesHelper.shardsWithState(state.getRoutingNodes(), ShardRoutingState.STARTED);
189+
startedShards = startedShards.stream()
190+
.filter(shardRouting -> shardRouting.getIndexName().equals(indexName))
191+
.filter(shardRouting -> shard == shardRouting.getId())
192+
.filter(shardRouting -> primary == shardRouting.primary())
193+
.collect(Collectors.toList());
194+
Collections.shuffle(startedShards, random());
195+
return state.nodes().get(startedShards.get(0).currentNodeId()).getName();
196+
}
197+
}

0 commit comments

Comments
 (0)