Skip to content

Commit 4ad150c

Browse files
authored
Backport 91297 to 8.5 (#91429)
1 parent 1eeee3b commit 4ad150c

File tree

5 files changed

+237
-21
lines changed

5 files changed

+237
-21
lines changed

docs/changelog/91297.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 91297
2+
summary: Check for unassigned shards on node shutdown
3+
area: Infra/Core
4+
type: enhancement
5+
issues:
6+
- 88635

test/framework/src/main/java/org/elasticsearch/test/gateway/TestGatewayAllocator.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@
88

99
package org.elasticsearch.test.gateway;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.elasticsearch.cluster.node.DiscoveryNode;
1214
import org.elasticsearch.cluster.node.DiscoveryNodes;
1315
import org.elasticsearch.cluster.routing.ShardRouting;
16+
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
1417
import org.elasticsearch.cluster.routing.allocation.FailedShard;
1518
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
1619
import org.elasticsearch.gateway.AsyncShardFetch;
@@ -45,6 +48,8 @@
4548
*/
4649
public class TestGatewayAllocator extends GatewayAllocator {
4750

51+
private static final Logger LOGGER = LogManager.getLogger(TestGatewayAllocator.class);
52+
4853
Map<String /* node id */, Map<ShardId, ShardRouting>> knownAllocations = new HashMap<>();
4954
DiscoveryNodes currentNodes = DiscoveryNodes.EMPTY_NODES;
5055

@@ -126,6 +131,19 @@ public void allocateUnassigned(
126131
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler);
127132
}
128133

134+
@Override
135+
public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) {
136+
assert unassignedShard.unassigned();
137+
assert routingAllocation.debugDecision();
138+
if (unassignedShard.primary()) {
139+
assert primaryShardAllocator != null;
140+
return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, LOGGER);
141+
} else {
142+
assert replicaShardAllocator != null;
143+
return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, LOGGER);
144+
}
145+
}
146+
129147
/**
130148
* manually add a specific shard to the allocations the gateway keeps track of
131149
*/

x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownShardsIT.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
1212
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
1313
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
14+
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1415
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
1516
import org.elasticsearch.action.index.IndexRequestBuilder;
1617
import org.elasticsearch.cluster.ClusterState;
@@ -410,6 +411,41 @@ public Settings onNodeStopped(String nodeName) throws Exception {
410411
ensureGreen("myindex");
411412
}
412413

414+
public void testNodeShutdownWithUnassignedShards() throws Exception {
415+
final String nodeA = internalCluster().startNode();
416+
final String nodeAId = getNodeId(nodeA);
417+
418+
createIndex(
419+
"index",
420+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
421+
);
422+
423+
ensureYellow("index");
424+
425+
// Start a second node, so the replica will be on nodeB
426+
final String nodeB = internalCluster().startNode();
427+
final String nodeBId = getNodeId(nodeB);
428+
ensureGreen("index");
429+
430+
client().admin()
431+
.cluster()
432+
.updateSettings(
433+
new ClusterUpdateSettingsRequest().persistentSettings(Settings.builder().put("cluster.routing.allocation.enable", "none"))
434+
);
435+
436+
assertThat(client().admin().indices().prepareFlush("index").get().getSuccessfulShards(), equalTo(2));
437+
assertThat(client().admin().indices().prepareRefresh("index").get().getSuccessfulShards(), equalTo(2));
438+
439+
internalCluster().restartNode(nodeA);
440+
internalCluster().restartNode(nodeB);
441+
442+
assertThat(client().admin().cluster().prepareHealth("index").get().getUnassignedShards(), equalTo(1));
443+
444+
putNodeShutdown(nodeAId, SingleNodeShutdownMetadata.Type.REMOVE, null);
445+
446+
assertBusy(() -> assertNodeShutdownStatus(nodeAId, STALLED));
447+
}
448+
413449
private void indexRandomData(String index) throws Exception {
414450
int numDocs = scaledRandomIntBetween(100, 1000);
415451
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];

x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusAction.java

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,16 @@
4141

4242
import java.util.ArrayList;
4343
import java.util.Arrays;
44+
import java.util.Iterator;
4445
import java.util.List;
4546
import java.util.Map;
4647
import java.util.Objects;
4748
import java.util.Optional;
4849
import java.util.Set;
4950
import java.util.concurrent.atomic.AtomicInteger;
51+
import java.util.function.UnaryOperator;
5052
import java.util.stream.Collectors;
53+
import java.util.stream.Stream;
5154

5255
import static org.elasticsearch.cluster.metadata.ShutdownShardMigrationStatus.NODE_ALLOCATION_DECISION_KEY;
5356
import static org.elasticsearch.core.Strings.format;
@@ -195,7 +198,48 @@ static ShutdownShardMigrationStatus shardMigrationStatus(
195198
return new ShutdownShardMigrationStatus(SingleNodeShutdownMetadata.Status.COMPLETE, 0);
196199
}
197200

198-
// First, check if there are any shards currently on this node, and if there are any relocating shards
201+
final RoutingAllocation allocation = new RoutingAllocation(
202+
allocationDeciders,
203+
currentState,
204+
clusterInfoService.getClusterInfo(),
205+
snapshotsInfoService.snapshotShardSizes(),
206+
System.nanoTime()
207+
);
208+
allocation.setDebugMode(RoutingAllocation.DebugMode.EXCLUDE_YES_DECISIONS);
209+
210+
// We also need the set of node IDs which are currently shutting down.
211+
Set<String> shuttingDownNodes = currentState.metadata().nodeShutdowns().keySet();
212+
213+
// Check if we have any unassigned primary shards that have this nodeId as their lastAllocatedNodeId
214+
var unassignedShards = Stream.iterate(
215+
currentState.getRoutingNodes().unassigned().iterator(),
216+
Iterator::hasNext,
217+
UnaryOperator.identity()
218+
)
219+
.map(Iterator::next)
220+
.filter(s -> Objects.equals(s.unassignedInfo().getLastAllocatedNodeId(), nodeId))
221+
.filter(s -> s.primary() || hasShardCopyOnAnotherNode(currentState, s, shuttingDownNodes) == false)
222+
.toList();
223+
224+
if (unassignedShards.isEmpty() == false) {
225+
var shardRouting = unassignedShards.get(0);
226+
ShardAllocationDecision decision = allocationService.explainShardAllocation(shardRouting, allocation);
227+
228+
return new ShutdownShardMigrationStatus(
229+
SingleNodeShutdownMetadata.Status.STALLED,
230+
unassignedShards.size(),
231+
format(
232+
"shard [%s] [%s] of index [%s] is unassigned, see [%s] for details or use the cluster allocation explain API",
233+
shardRouting.shardId().getId(),
234+
shardRouting.primary() ? "primary" : "replica",
235+
shardRouting.index().getName(),
236+
NODE_ALLOCATION_DECISION_KEY
237+
),
238+
decision
239+
);
240+
}
241+
242+
// Check if there are any shards currently on this node, and if there are any relocating shards
199243
int startedShards = currentState.getRoutingNodes().node(nodeId).numberOfShardsWithState(ShardRoutingState.STARTED);
200244
int relocatingShards = currentState.getRoutingNodes().node(nodeId).numberOfShardsWithState(ShardRoutingState.RELOCATING);
201245
int initializingShards = currentState.getRoutingNodes().node(nodeId).numberOfShardsWithState(ShardRoutingState.INITIALIZING);
@@ -217,18 +261,6 @@ static ShutdownShardMigrationStatus shardMigrationStatus(
217261
}
218262

219263
// If there's no relocating shards and shards still on this node, we need to figure out why
220-
final RoutingAllocation allocation = new RoutingAllocation(
221-
allocationDeciders,
222-
currentState,
223-
clusterInfoService.getClusterInfo(),
224-
snapshotsInfoService.snapshotShardSizes(),
225-
System.nanoTime()
226-
);
227-
allocation.setDebugMode(RoutingAllocation.DebugMode.EXCLUDE_YES_DECISIONS);
228-
229-
// We also need the set of node IDs which are currently shutting down.
230-
Set<String> shuttingDownNodes = currentState.metadata().nodeShutdowns().keySet();
231-
232264
AtomicInteger shardsToIgnoreForFinalStatus = new AtomicInteger(0);
233265

234266
// Explain shard allocations until we find one that can't move, then stop (as `findFirst` short-circuits)
@@ -249,14 +281,7 @@ static ShutdownShardMigrationStatus shardMigrationStatus(
249281
.filter(pair -> pair.v2().getMoveDecision().getAllocationDecision().equals(AllocationDecision.YES) == false)
250282
// If the shard that can't move is on every node in the cluster, we shouldn't be `STALLED` on it.
251283
.filter(pair -> {
252-
final boolean hasShardCopyOnOtherNode = currentState.routingTable()
253-
.allShards(pair.v1().index().getName())
254-
.stream()
255-
.filter(shardRouting -> shardRouting.id() == pair.v1().id())
256-
// If any shards are both 1) `STARTED` and 2) are not on a node that's shutting down, we have at least one copy
257-
// of this shard safely on a node that's not shutting down, so we don't want to report `STALLED` because of this shard.
258-
.filter(ShardRouting::started)
259-
.anyMatch(routing -> shuttingDownNodes.contains(routing.currentNodeId()) == false);
284+
final boolean hasShardCopyOnOtherNode = hasShardCopyOnAnotherNode(currentState, pair.v1(), shuttingDownNodes);
260285
if (hasShardCopyOnOtherNode) {
261286
shardsToIgnoreForFinalStatus.incrementAndGet();
262287
}
@@ -305,6 +330,17 @@ static ShutdownShardMigrationStatus shardMigrationStatus(
305330
}
306331
}
307332

333+
private static boolean hasShardCopyOnAnotherNode(ClusterState clusterState, ShardRouting shardRouting, Set<String> shuttingDownNodes) {
334+
return clusterState.routingTable()
335+
.allShards(shardRouting.index().getName())
336+
.stream()
337+
.filter(sr -> sr.id() == shardRouting.id())
338+
// If any shards are both 1) `STARTED` and 2) are not on a node that's shutting down, we have at least one copy
339+
// of this shard safely on a node that's not shutting down, so we don't want to report `STALLED` because of this shard.
340+
.filter(ShardRouting::started)
341+
.anyMatch(routing -> shuttingDownNodes.contains(routing.currentNodeId()) == false);
342+
}
343+
308344
@Override
309345
protected ClusterBlockException checkBlock(GetShutdownStatusAction.Request request, ClusterState state) {
310346
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);

x-pack/plugin/shutdown/src/test/java/org/elasticsearch/xpack/shutdown/TransportGetShutdownStatusActionTests.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.cluster.routing.ShardRouting;
2525
import org.elasticsearch.cluster.routing.ShardRoutingState;
2626
import org.elasticsearch.cluster.routing.TestShardRouting;
27+
import org.elasticsearch.cluster.routing.UnassignedInfo;
2728
import org.elasticsearch.cluster.routing.allocation.AllocationService;
2829
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
2930
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
@@ -34,26 +35,32 @@
3435
import org.elasticsearch.cluster.routing.allocation.decider.NodeShutdownAllocationDecider;
3536
import org.elasticsearch.common.settings.Settings;
3637
import org.elasticsearch.common.transport.TransportAddress;
38+
import org.elasticsearch.gateway.GatewayAllocator;
3739
import org.elasticsearch.index.Index;
3840
import org.elasticsearch.index.shard.ShardId;
3941
import org.elasticsearch.node.Node;
4042
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
4143
import org.elasticsearch.snapshots.SnapshotsInfoService;
4244
import org.elasticsearch.test.ESTestCase;
45+
import org.elasticsearch.test.gateway.TestGatewayAllocator;
4346
import org.hamcrest.Matcher;
4447
import org.junit.Before;
4548

4649
import java.util.Collections;
4750
import java.util.HashMap;
4851
import java.util.List;
4952
import java.util.Map;
53+
import java.util.Set;
5054
import java.util.concurrent.atomic.AtomicReference;
5155

5256
import static org.hamcrest.Matchers.allOf;
5357
import static org.hamcrest.Matchers.containsString;
5458
import static org.hamcrest.Matchers.equalTo;
5559
import static org.hamcrest.Matchers.is;
5660
import static org.hamcrest.Matchers.nullValue;
61+
import static org.mockito.ArgumentMatchers.anyInt;
62+
import static org.mockito.Mockito.doAnswer;
63+
import static org.mockito.Mockito.spy;
5764

5865
public class TransportGetShutdownStatusActionTests extends ESTestCase {
5966
public static final String SHUTTING_DOWN_NODE_ID = "node1";
@@ -122,6 +129,7 @@ public Decision canRebalance(RoutingAllocation allocation) {
122129
clusterInfoService,
123130
snapshotsInfoService
124131
);
132+
allocationService.setExistingShardsAllocators(Map.of(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator()));
125133
}
126134

127135
/**
@@ -349,6 +357,60 @@ public void testStalled() {
349357
);
350358
}
351359

360+
/**
361+
* Ensure we can detect stalled migrations when we have unassigned shards that had the shutting down node as their last known
362+
* node id
363+
*/
364+
public void testStalledUnassigned() {
365+
Index index = new Index(randomAlphaOfLength(5), randomAlphaOfLengthBetween(1, 20));
366+
IndexMetadata imd = spy(generateIndexMetadata(index, 3, 0));
367+
// make sure the TestGatewayAllocator stays in sync always, avoid flaky tests
368+
doAnswer(i -> {
369+
if ((Integer) i.getArgument(0) < 2) {
370+
return Set.of(LIVE_NODE_ID);
371+
}
372+
return Set.of(SHUTTING_DOWN_NODE_ID);
373+
}).when(imd).inSyncAllocationIds(anyInt());
374+
375+
var shard0 = TestShardRouting.newShardRouting(new ShardId(index, 0), LIVE_NODE_ID, true, ShardRoutingState.STARTED);
376+
var shard1 = TestShardRouting.newShardRouting(new ShardId(index, 1), LIVE_NODE_ID, true, ShardRoutingState.STARTED);
377+
378+
// we should stall the node if we find an unassigned shard with lastAllocatedNodeId matching the shutting down node
379+
var unassigned = makeUnassignedShard(index, 2, SHUTTING_DOWN_NODE_ID, true);
380+
381+
assertShardMigration(
382+
getUnassignedShutdownStatus(index, imd, shard0, shard1, unassigned),
383+
SingleNodeShutdownMetadata.Status.STALLED,
384+
1,
385+
allOf(containsString(index.getName()), containsString("[2] [primary]"))
386+
);
387+
388+
// if the shard is unassigned, but it's not a primary on this node, we shouldn't stall
389+
var shard2 = TestShardRouting.newShardRouting(new ShardId(index, 2), LIVE_NODE_ID, true, ShardRoutingState.STARTED);
390+
var unassignedReplica = makeUnassignedShard(index, 2, SHUTTING_DOWN_NODE_ID, false);
391+
392+
var s = getUnassignedShutdownStatus(index, imd, shard0, shard1, shard2, unassignedReplica);
393+
assertShardMigration(s, SingleNodeShutdownMetadata.Status.COMPLETE, 0, nullValue());
394+
395+
// check if we correctly count all of the unassigned shards
396+
var unassigned3 = makeUnassignedShard(index, 3, SHUTTING_DOWN_NODE_ID, true);
397+
398+
assertShardMigration(
399+
getUnassignedShutdownStatus(index, imd, shard0, shard1, unassigned3, unassigned),
400+
SingleNodeShutdownMetadata.Status.STALLED,
401+
2,
402+
allOf(containsString(index.getName()), containsString("[2] [primary]"))
403+
);
404+
405+
// check if we correctly walk all of the unassigned shards, shard 2 replica, shard 3 primary
406+
assertShardMigration(
407+
getUnassignedShutdownStatus(index, imd, shard0, shard1, shard2, unassignedReplica, unassigned3),
408+
SingleNodeShutdownMetadata.Status.STALLED,
409+
1,
410+
allOf(containsString(index.getName()), containsString("[3] [primary]"))
411+
);
412+
}
413+
352414
public void testNotStalledIfAllShardsHaveACopyOnAnotherNode() {
353415
Index index = new Index(randomAlphaOfLength(5), randomAlphaOfLengthBetween(1, 20));
354416
IndexMetadata imd = generateIndexMetadata(index, 3, 0);
@@ -571,4 +633,62 @@ private ClusterState createTestClusterState(
571633
.routingTable(indexRoutingTable)
572634
.build();
573635
}
636+
637+
private UnassignedInfo makeUnassignedInfo(String nodeId) {
638+
return new UnassignedInfo(
639+
UnassignedInfo.Reason.ALLOCATION_FAILED,
640+
"testing",
641+
null,
642+
1,
643+
System.nanoTime(),
644+
System.currentTimeMillis(),
645+
false,
646+
UnassignedInfo.AllocationStatus.NO_ATTEMPT,
647+
Collections.emptySet(),
648+
nodeId
649+
);
650+
}
651+
652+
private ShardRouting makeUnassignedShard(Index index, int shardId, String nodeId, boolean primary) {
653+
var unsignedInfo = makeUnassignedInfo(nodeId);
654+
655+
return TestShardRouting.newShardRouting(
656+
new ShardId(index, shardId),
657+
null,
658+
null,
659+
primary,
660+
ShardRoutingState.UNASSIGNED,
661+
unsignedInfo
662+
);
663+
}
664+
665+
private ShutdownShardMigrationStatus getUnassignedShutdownStatus(Index index, IndexMetadata imd, ShardRouting... shards) {
666+
var indexRoutingTableBuilder = IndexRoutingTable.builder(index);
667+
668+
for (var routing : shards) {
669+
indexRoutingTableBuilder.addShard(routing);
670+
}
671+
672+
var indexRoutingTable = indexRoutingTableBuilder.build();
673+
674+
// Force a decision of NO for all moves and new allocations, simulating a decider that's stuck
675+
canAllocate.set((r, n, a) -> Decision.NO);
676+
// And the remain decider simulates NodeShutdownAllocationDecider
677+
canRemain.set((r, n, a) -> n.nodeId().equals(SHUTTING_DOWN_NODE_ID) ? Decision.NO : Decision.YES);
678+
679+
RoutingTable.Builder routingTable = RoutingTable.builder();
680+
routingTable.add(indexRoutingTable);
681+
ClusterState state = createTestClusterState(routingTable.build(), List.of(imd), SingleNodeShutdownMetadata.Type.REMOVE);
682+
683+
return TransportGetShutdownStatusAction.shardMigrationStatus(
684+
state,
685+
SHUTTING_DOWN_NODE_ID,
686+
SingleNodeShutdownMetadata.Type.REMOVE,
687+
true,
688+
clusterInfoService,
689+
snapshotsInfoService,
690+
allocationService,
691+
allocationDeciders
692+
);
693+
}
574694
}

0 commit comments

Comments
 (0)