Skip to content

Commit b94a20e

Browse files
authored
Reset relocation/allocation failure counter on node join/shutdown (#119968)
We prevent retries of allocations/relocations once they see index.allocation.max_retries failed attempts (default 5). In #108987, we added reseting the allocation failure counters when a node joins the cluster. As discussed in the linked discussion, it would make sense to extend this reset also to relocations AND also consider node shutdown events. With this change we reset both allocation/relocation failures if a new node joins the cluster or a shutdown metadata is applied. The subset of shutdown events that we consider and how we track them is more or less copied from what was done for #106998. To me the logic seemed to make sense here too. Closes ES-10492
1 parent 38b0e92 commit b94a20e

File tree

6 files changed

+548
-11
lines changed

6 files changed

+548
-11
lines changed

docs/changelog/119968.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 119968
2+
summary: Reset relocation/allocation failure counter on node join/shutdown
3+
area: Allocation
4+
type: enhancement
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/AllocationFailuresResetIT.java

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,29 @@
99

1010
package org.elasticsearch.cluster.routing.allocation;
1111

12+
import org.apache.logging.log4j.Level;
13+
import org.elasticsearch.cluster.routing.RoutingNodes;
1214
import org.elasticsearch.cluster.routing.ShardRouting;
15+
import org.elasticsearch.cluster.routing.ShardRoutingState;
1316
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
17+
import org.elasticsearch.common.Strings;
1418
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.index.Index;
1520
import org.elasticsearch.index.shard.IndexEventListener;
1621
import org.elasticsearch.plugins.Plugin;
1722
import org.elasticsearch.test.ESIntegTestCase;
1823
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
1924
import org.elasticsearch.test.ESIntegTestCase.Scope;
2025
import org.elasticsearch.test.MockIndexEventListener;
26+
import org.elasticsearch.test.MockLog;
2127

2228
import java.util.List;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
31+
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX;
32+
import static org.hamcrest.CoreMatchers.equalTo;
33+
import static org.hamcrest.CoreMatchers.not;
34+
import static org.hamcrest.CoreMatchers.notNullValue;
2335

2436
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
2537
public class AllocationFailuresResetIT extends ESIntegTestCase {
@@ -49,7 +61,7 @@ private void removeAllocationFailuresInjection(String node) {
4961
private void awaitShardAllocMaxRetries() throws Exception {
5062
var maxRetries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(internalCluster().getDefaultSettings());
5163
assertBusy(() -> {
52-
var state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
64+
var state = safeGet(clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).execute()).getState();
5365
var index = state.getRoutingTable().index(INDEX);
5466
assertNotNull(index);
5567
var shard = index.shard(SHARD).primaryShard();
@@ -62,7 +74,7 @@ private void awaitShardAllocMaxRetries() throws Exception {
6274

6375
private void awaitShardAllocSucceed() throws Exception {
6476
assertBusy(() -> {
65-
var state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
77+
var state = safeGet(clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).execute()).getState();
6678
var index = state.getRoutingTable().index(INDEX);
6779
assertNotNull(index);
6880
var shard = index.shard(SHARD).primaryShard();
@@ -72,14 +84,77 @@ private void awaitShardAllocSucceed() throws Exception {
7284
});
7385
}
7486

75-
public void testResetFailuresOnNodeJoin() throws Exception {
87+
public void testResetAllocationFailuresOnNodeJoin() throws Exception {
7688
var node1 = internalCluster().startNode();
7789
injectAllocationFailures(node1);
7890
prepareCreate(INDEX, indexSettings(1, 0)).execute();
7991
awaitShardAllocMaxRetries();
8092
removeAllocationFailuresInjection(node1);
81-
internalCluster().startNode();
82-
awaitShardAllocSucceed();
93+
try (var mockLog = MockLog.capture(RoutingNodes.class)) {
94+
var shardId = internalCluster().clusterService().state().routingTable().index(INDEX).shard(SHARD).shardId();
95+
mockLog.addExpectation(
96+
new MockLog.SeenEventExpectation(
97+
"log resetting failed allocations",
98+
RoutingNodes.class.getName(),
99+
Level.INFO,
100+
Strings.format(RoutingNodes.RESET_FAILED_ALLOCATION_COUNTER_LOG_MSG, 1, List.of(shardId))
101+
)
102+
);
103+
internalCluster().startNode();
104+
awaitShardAllocSucceed();
105+
mockLog.assertAllExpectationsMatched();
106+
}
83107
}
84108

109+
public void testResetRelocationFailuresOnNodeJoin() throws Exception {
110+
String node1 = internalCluster().startNode();
111+
createIndex(INDEX, 1, 0);
112+
ensureGreen(INDEX);
113+
final var failRelocation = new AtomicBoolean(true);
114+
String node2 = internalCluster().startNode();
115+
internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node2).setNewDelegate(new IndexEventListener() {
116+
@Override
117+
public void beforeIndexCreated(Index index, Settings indexSettings) {
118+
if (failRelocation.get()) {
119+
throw new RuntimeException("FAIL");
120+
}
121+
}
122+
});
123+
updateIndexSettings(Settings.builder().put(INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", node1), INDEX);
124+
ensureGreen(INDEX);
125+
// await all relocation attempts are exhausted
126+
var maxAttempts = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY);
127+
assertBusy(() -> {
128+
var state = safeGet(clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).execute()).getState();
129+
var shard = state.routingTable().index(INDEX).shard(SHARD).primaryShard();
130+
assertThat(shard, notNullValue());
131+
assertThat(shard.relocationFailureInfo().failedRelocations(), equalTo(maxAttempts));
132+
});
133+
// ensure the shard remain started
134+
var state = safeGet(clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).execute()).getState();
135+
var shard = state.routingTable().index(INDEX).shard(SHARD).primaryShard();
136+
assertThat(shard, notNullValue());
137+
assertThat(shard.state(), equalTo(ShardRoutingState.STARTED));
138+
assertThat(state.nodes().get(shard.currentNodeId()).getName(), equalTo(node1));
139+
failRelocation.set(false);
140+
// A new node joining should reset the counter and allow more relocation retries
141+
try (var mockLog = MockLog.capture(RoutingNodes.class)) {
142+
mockLog.addExpectation(
143+
new MockLog.SeenEventExpectation(
144+
"log resetting failed relocations",
145+
RoutingNodes.class.getName(),
146+
Level.INFO,
147+
Strings.format(RoutingNodes.RESET_FAILED_RELOCATION_COUNTER_LOG_MSG, 1, List.of(shard.shardId()))
148+
)
149+
);
150+
internalCluster().startNode();
151+
assertBusy(() -> {
152+
var stateAfterNodeJoin = internalCluster().clusterService().state();
153+
var relocatedShard = stateAfterNodeJoin.routingTable().index(INDEX).shard(SHARD).primaryShard();
154+
assertThat(relocatedShard, notNullValue());
155+
assertThat(stateAfterNodeJoin.nodes().get(relocatedShard.currentNodeId()).getName(), not(equalTo(node1)));
156+
});
157+
mockLog.assertAllExpectationsMatched();
158+
}
159+
}
85160
}

server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@
1616
import org.elasticsearch.cluster.node.DiscoveryNodes;
1717
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
1818
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
19+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
1920
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics;
21+
import org.elasticsearch.common.Strings;
2022
import org.elasticsearch.common.collect.Iterators;
2123
import org.elasticsearch.common.util.Maps;
2224
import org.elasticsearch.core.Assertions;
2325
import org.elasticsearch.core.Nullable;
2426
import org.elasticsearch.core.Tuple;
2527
import org.elasticsearch.index.Index;
28+
import org.elasticsearch.index.IndexNotFoundException;
2629
import org.elasticsearch.index.shard.ShardId;
30+
import org.elasticsearch.logging.LogManager;
31+
import org.elasticsearch.logging.Logger;
2732

2833
import java.util.ArrayDeque;
2934
import java.util.ArrayList;
@@ -44,6 +49,8 @@
4449
import java.util.stream.Stream;
4550
import java.util.stream.StreamSupport;
4651

52+
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
53+
4754
/**
4855
* {@link RoutingNodes} represents a copy the routing information contained in the {@link ClusterState cluster state}.
4956
* It can be either initialized as mutable or immutable allowing or disallowing changes to its elements.
@@ -60,6 +67,13 @@
6067
*/
6168
public class RoutingNodes implements Iterable<RoutingNode> {
6269

70+
private static final Logger logger = LogManager.getLogger(RoutingNodes.class);
71+
public static final String RESET_FAILED_ALLOCATION_COUNTER_LOG_MSG =
72+
"Resetting failure counter for %d shard(s) that have reached their max allocation retires (%s)";
73+
public static final String RESET_FAILED_RELOCATION_COUNTER_LOG_MSG =
74+
"Resetting failure counter for %d shard(s) that have reached their max relocation retries (%s)";
75+
private static final int MAX_SHARDS_IN_LOG_MSG = 20;
76+
6377
private final Map<String, RoutingNode> nodesToShards;
6478

6579
private final UnassignedShards unassignedShards;
@@ -1298,14 +1312,47 @@ public boolean hasAllocationFailures() {
12981312
}));
12991313
}
13001314

1301-
public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) {
1315+
public boolean hasRelocationFailures() {
1316+
for (var shardRoutings : assignedShards.values()) {
1317+
for (var routing : shardRoutings) {
1318+
if (routing.relocationFailureInfo() != null && routing.relocationFailureInfo().failedRelocations() > 0) {
1319+
return true;
1320+
}
1321+
}
1322+
}
1323+
return false;
1324+
}
1325+
1326+
public void resetFailedCounter(RoutingAllocation allocation) {
1327+
final var observer = allocation.changes();
1328+
int shardsWithMaxFailedAllocations = 0;
1329+
int shardsWithMaxFailedRelocations = 0;
1330+
List<ShardId> topShardIdsWithFailedAllocations = new ArrayList<>();
1331+
List<ShardId> topShardIdsWithFailedRelocations = new ArrayList<>();
1332+
13021333
final var unassignedIterator = unassigned().iterator();
13031334
while (unassignedIterator.hasNext()) {
13041335
ShardRouting shardRouting = unassignedIterator.next();
13051336
UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
1337+
int failedAllocations = unassignedInfo.failedAllocations();
1338+
if (failedAllocations > 0) {
1339+
try {
1340+
final var maxRetry = SETTING_ALLOCATION_MAX_RETRY.get(
1341+
allocation.metadata().getIndexSafe(shardRouting.index()).getSettings()
1342+
);
1343+
if (failedAllocations >= maxRetry) {
1344+
shardsWithMaxFailedAllocations++;
1345+
if (topShardIdsWithFailedAllocations.size() <= MAX_SHARDS_IN_LOG_MSG) {
1346+
topShardIdsWithFailedAllocations.add(shardRouting.shardId());
1347+
}
1348+
}
1349+
} catch (IndexNotFoundException e) {
1350+
// ignore
1351+
}
1352+
}
13061353
unassignedIterator.updateUnassigned(
13071354
new UnassignedInfo(
1308-
unassignedInfo.failedAllocations() > 0 ? UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.reason(),
1355+
failedAllocations > 0 ? UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.reason(),
13091356
unassignedInfo.message(),
13101357
unassignedInfo.failure(),
13111358
0,
@@ -1317,7 +1364,7 @@ public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) {
13171364
unassignedInfo.lastAllocatedNodeId()
13181365
),
13191366
shardRouting.recoverySource(),
1320-
routingChangesObserver
1367+
observer
13211368
);
13221369
}
13231370

@@ -1326,6 +1373,20 @@ public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) {
13261373
for (ShardRouting shardRouting : routingNode) {
13271374
if (shardRouting.relocationFailureInfo() != null && shardRouting.relocationFailureInfo().failedRelocations() > 0) {
13281375
shardsWithRelocationFailures.add(shardRouting);
1376+
try {
1377+
int failedRelocations = shardRouting.relocationFailureInfo().failedRelocations();
1378+
final var maxRetry = SETTING_ALLOCATION_MAX_RETRY.get(
1379+
allocation.metadata().getIndexSafe(shardRouting.index()).getSettings()
1380+
);
1381+
if (failedRelocations >= maxRetry) {
1382+
shardsWithMaxFailedRelocations++;
1383+
if (topShardIdsWithFailedRelocations.size() <= MAX_SHARDS_IN_LOG_MSG) {
1384+
topShardIdsWithFailedRelocations.add(shardRouting.shardId());
1385+
}
1386+
}
1387+
} catch (IndexNotFoundException e) {
1388+
// ignore
1389+
}
13291390
}
13301391
}
13311392

@@ -1336,6 +1397,17 @@ public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) {
13361397
assignedShardsAdd(updated);
13371398
}
13381399
}
1400+
1401+
if (shardsWithMaxFailedAllocations > 0) {
1402+
logger.info(
1403+
Strings.format(RESET_FAILED_ALLOCATION_COUNTER_LOG_MSG, shardsWithMaxFailedAllocations, topShardIdsWithFailedAllocations)
1404+
);
1405+
}
1406+
if (shardsWithMaxFailedRelocations > 0) {
1407+
logger.info(
1408+
Strings.format(RESET_FAILED_RELOCATION_COUNTER_LOG_MSG, shardsWithMaxFailedRelocations, topShardIdsWithFailedRelocations)
1409+
);
1410+
}
13391411
}
13401412

13411413
/**

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
1414
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.cluster.ClusterChangedEvent;
1516
import org.elasticsearch.cluster.ClusterInfoService;
1617
import org.elasticsearch.cluster.ClusterState;
1718
import org.elasticsearch.cluster.RestoreInProgress;
1819
import org.elasticsearch.cluster.health.ClusterHealthStatus;
1920
import org.elasticsearch.cluster.metadata.AutoExpandReplicas;
2021
import org.elasticsearch.cluster.metadata.IndexMetadata;
2122
import org.elasticsearch.cluster.metadata.Metadata;
23+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
2224
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type;
2325
import org.elasticsearch.cluster.node.DiscoveryNode;
2426
import org.elasticsearch.cluster.routing.IndexRoutingTable;
@@ -573,15 +575,71 @@ public void addAllocFailuresResetListenerTo(ClusterService clusterService) {
573575
});
574576

575577
clusterService.addListener((changeEvent) -> {
576-
if (changeEvent.nodesAdded() && changeEvent.state().getRoutingNodes().hasAllocationFailures()) {
578+
if (shouldResetAllocationFailures(changeEvent)) {
577579
taskQueue.submitTask("reset-allocation-failures", (e) -> { assert MasterService.isPublishFailureException(e); }, null);
578580
}
579581
});
580582
}
581583

584+
/**
585+
* We should reset allocation/relocation failure count to allow further retries when:
586+
*
587+
* 1. A new node joins the cluster.
588+
* 2. A node shutdown metadata is added that could lead to a node being removed or replaced in the cluster.
589+
*
590+
* Note that removing a non-RESTART shutdown metadata from a node that is still in the cluster is treated similarly and
591+
* will cause resetting the allocation/relocation failures.
592+
*/
593+
private boolean shouldResetAllocationFailures(ClusterChangedEvent changeEvent) {
594+
final var clusterState = changeEvent.state();
595+
596+
if (clusterState.getRoutingNodes().hasAllocationFailures() == false
597+
&& clusterState.getRoutingNodes().hasRelocationFailures() == false) {
598+
return false;
599+
}
600+
if (changeEvent.nodesAdded()) {
601+
return true;
602+
}
603+
604+
final var currentNodeShutdowns = clusterState.metadata().nodeShutdowns();
605+
final var previousNodeShutdowns = changeEvent.previousState().metadata().nodeShutdowns();
606+
607+
if (currentNodeShutdowns.equals(previousNodeShutdowns)) {
608+
return false;
609+
}
610+
611+
for (var currentShutdown : currentNodeShutdowns.getAll().entrySet()) {
612+
var previousNodeShutdown = previousNodeShutdowns.get(currentShutdown.getKey());
613+
if (currentShutdown.equals(previousNodeShutdown)) {
614+
continue;
615+
}
616+
// A RESTART doesn't necessarily move around shards, so no need to consider it for a reset.
617+
// Furthermore, once the node rejoins after restarting, there will be a reset if necessary.
618+
if (currentShutdown.getValue().getType() == SingleNodeShutdownMetadata.Type.RESTART) {
619+
continue;
620+
}
621+
// A node with no shutdown marker or a RESTART marker receives a non-RESTART shutdown marker
622+
if (previousNodeShutdown == null || previousNodeShutdown.getType() == Type.RESTART) {
623+
return true;
624+
}
625+
}
626+
627+
for (var previousShutdown : previousNodeShutdowns.getAll().entrySet()) {
628+
var nodeId = previousShutdown.getKey();
629+
// A non-RESTART marker is removed but the node is still in the cluster. We could re-attempt failed relocations/allocations.
630+
if (currentNodeShutdowns.get(nodeId) == null
631+
&& previousShutdown.getValue().getType() != SingleNodeShutdownMetadata.Type.RESTART
632+
&& clusterState.nodes().get(nodeId) != null) {
633+
return true;
634+
}
635+
}
636+
637+
return false;
638+
}
639+
582640
private ClusterState rerouteWithResetFailedCounter(ClusterState clusterState) {
583641
RoutingAllocation allocation = createRoutingAllocation(clusterState, currentNanoTime());
584-
allocation.routingNodes().resetFailedCounter(allocation.changes());
642+
allocation.routingNodes().resetFailedCounter(allocation);
585643
reroute(allocation, routingAllocation -> shardsAllocator.allocate(routingAllocation, ActionListener.noop()));
586644
return buildResultAndLogHealthChange(clusterState, allocation, "reroute with reset failed counter");
587645
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ default RoutingExplanations execute(RoutingAllocation allocation, AllocationComm
6262

6363
try {
6464
if (retryFailed) {
65-
allocation.routingNodes().resetFailedCounter(allocation.changes());
65+
allocation.routingNodes().resetFailedCounter(allocation);
6666
}
6767
return commands.execute(allocation, explain);
6868
} finally {

0 commit comments

Comments
 (0)