Skip to content

Commit e88bcaf

Browse files
authored
Better handling of node ids from shutdown metadata (#128298)
Currently, the DiskThresholdMonitor code considers shutdown metadata to identify nodes that are being replaced. If the node-to-be-replaced (source) leaves the cluster before the corresponding shutdown metadata is removed from the cluster state, we can have a NPE. This PR adds a test for that and improves a bit code to handle node ids for source and target from shutdown metadata. Fixes #100201
1 parent b977b04 commit e88bcaf

File tree

3 files changed

+88
-5
lines changed

3 files changed

+88
-5
lines changed

docs/changelog/128298.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 128298
2+
summary: Better handling of node ids from shutdown metadata (avoid NPE on already removed nodes)
3+
area: Infra/Node Lifecycle
4+
type: bug
5+
issues:
6+
- 100201

server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import java.util.HashSet;
5050
import java.util.List;
5151
import java.util.Map;
52-
import java.util.Objects;
5352
import java.util.Set;
5453
import java.util.concurrent.atomic.AtomicBoolean;
5554
import java.util.concurrent.atomic.AtomicLong;
@@ -376,9 +375,12 @@ public void onNewInfo(ClusterInfo info) {
376375
}
377376

378377
// Generate a map of node name to ID so we can use it to look up node replacement targets
379-
final Map<String, String> nodeNameToId = state.getRoutingNodes()
378+
final Map<String, List<String>> nodeNameToIds = state.getRoutingNodes()
380379
.stream()
381-
.collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2));
380+
.collect(Collectors.groupingBy(rn -> rn.node().getName(), Collectors.mapping(RoutingNode::nodeId, Collectors.toList())));
381+
382+
// Generate a set of the valid node IDs so we can use it to filter valid sources
383+
final Set<String> routingNodeIds = state.getRoutingNodes().stream().map(RoutingNode::nodeId).collect(Collectors.toSet());
382384

383385
// Calculate both the source node id and the target node id of a "replace" type shutdown
384386
final Set<String> nodesIdsPartOfReplacement = state.metadata()
@@ -387,8 +389,8 @@ public void onNewInfo(ClusterInfo info) {
387389
.values()
388390
.stream()
389391
.filter(meta -> meta.getType() == SingleNodeShutdownMetadata.Type.REPLACE)
390-
.flatMap(meta -> Stream.of(meta.getNodeId(), nodeNameToId.get(meta.getTargetNodeName())))
391-
.filter(Objects::nonNull) // The REPLACE target node might not still be in RoutingNodes
392+
.flatMap(meta -> Stream.concat(Stream.of(meta.getNodeId()), nodeIdsOrEmpty(meta, nodeNameToIds)))
393+
.filter(routingNodeIds::contains) // The REPLACE source node might already have been removed from RoutingNodes
392394
.collect(Collectors.toSet());
393395

394396
// Generate a set of all the indices that exist on either the target or source of a node replacement
@@ -437,6 +439,11 @@ public void onNewInfo(ClusterInfo info) {
437439
}
438440
}
439441

442+
private static Stream<String> nodeIdsOrEmpty(SingleNodeShutdownMetadata meta, Map<String, List<String>> nodeNameToIds) {
443+
var ids = nodeNameToIds.get(meta.getTargetNodeName()); // The REPLACE target node might not still be in RoutingNodes
444+
return ids == null ? Stream.empty() : ids.stream();
445+
}
446+
440447
// exposed for tests to override
441448
long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) {
442449
return DiskThresholdDecider.sizeOfUnaccountedShards(

server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,6 +1409,76 @@ public void testSkipDiskThresholdMonitorWhenStateNotRecovered() {
14091409
assertNull(result2.v2());
14101410
}
14111411

1412+
private void doTestSkipNodesNotInRoutingTable(boolean sourceNodeInTable, boolean targetNodeInTable) {
1413+
final var projectId = randomProjectIdOrDefault();
1414+
final Metadata.Builder metadataBuilder = Metadata.builder()
1415+
.put(
1416+
ProjectMetadata.builder(projectId)
1417+
.put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(1).numberOfReplicas(1))
1418+
.build()
1419+
);
1420+
1421+
metadataBuilder.putCustom(
1422+
NodesShutdownMetadata.TYPE,
1423+
new NodesShutdownMetadata(
1424+
Collections.singletonMap(
1425+
"node1",
1426+
SingleNodeShutdownMetadata.builder()
1427+
.setNodeId("node1")
1428+
.setNodeEphemeralId("node1")
1429+
.setReason("testing")
1430+
.setType(SingleNodeShutdownMetadata.Type.REPLACE)
1431+
.setTargetNodeName("node3")
1432+
.setStartedAtMillis(randomNonNegativeLong())
1433+
.build()
1434+
)
1435+
)
1436+
);
1437+
1438+
final Metadata metadata = metadataBuilder.build();
1439+
final RoutingTable routingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY)
1440+
.addAsNew(metadata.getProject(projectId).index("test"))
1441+
.build();
1442+
DiscoveryNodes.Builder discoveryNodes = DiscoveryNodes.builder().add(newNormalNode("node2", "node2"));
1443+
// node1 which is replaced by node3 may or may not be in the cluster
1444+
if (sourceNodeInTable) {
1445+
discoveryNodes.add(newNormalNode("node1", "node1"));
1446+
}
1447+
// node3 which is to replace node1 may or may not be in the cluster
1448+
if (targetNodeInTable) {
1449+
discoveryNodes.add(newNormalNode("node3", "node3"));
1450+
}
1451+
final ClusterState clusterState = applyStartedShardsUntilNoChange(
1452+
ClusterState.builder(ClusterName.DEFAULT)
1453+
.metadata(metadata)
1454+
.routingTable(GlobalRoutingTable.builder().put(projectId, routingTable).build())
1455+
.nodes(discoveryNodes)
1456+
.build(),
1457+
createAllocationService(Settings.EMPTY)
1458+
);
1459+
final Index testIndex = routingTable.index("test").getIndex();
1460+
1461+
Map<String, DiskUsage> diskUsages = new HashMap<>();
1462+
diskUsages.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
1463+
diskUsages.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
1464+
final ClusterInfo clusterInfo = clusterInfo(diskUsages);
1465+
Tuple<Boolean, Set<Index>> result = runDiskThresholdMonitor(clusterState, clusterInfo);
1466+
assertTrue(result.v1()); // reroute on new nodes
1467+
assertThat(result.v2(), contains(testIndex));
1468+
}
1469+
1470+
public void testSkipReplaceSourceNodeNotInRoutingTable() {
1471+
doTestSkipNodesNotInRoutingTable(false, true);
1472+
}
1473+
1474+
public void testSkipReplaceTargetNodeNotInRoutingTable() {
1475+
doTestSkipNodesNotInRoutingTable(true, false);
1476+
}
1477+
1478+
public void testSkipReplaceSourceAndTargetNodesNotInRoutingTable() {
1479+
doTestSkipNodesNotInRoutingTable(false, false);
1480+
}
1481+
14121482
// Runs a disk threshold monitor with a given cluster state and cluster info and returns whether a reroute should
14131483
// happen and any indices that should be marked as read-only.
14141484
private Tuple<Boolean, Set<Index>> runDiskThresholdMonitor(ClusterState clusterState, ClusterInfo clusterInfo) {

0 commit comments

Comments
 (0)