Skip to content

Commit b66461d

Browse files
committed
Better handling of node ids from shutdown metadata (elastic#128298)
Currently, the DiskThresholdMonitor code considers shutdown metadata to identify nodes that are being replaced. If the node-to-be-replaced (source) leaves the cluster before the corresponding shutdown metadata is removed from the cluster state, we can have a NPE. This PR adds a test for that and improves a bit code to handle node ids for source and target from shutdown metadata. Fixes elastic#100201
1 parent ab89b87 commit b66461d

File tree

3 files changed

+88
-5
lines changed

3 files changed

+88
-5
lines changed

docs/changelog/128298.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 128298
2+
summary: Better handling of node ids from shutdown metadata (avoid NPE on already removed nodes)
3+
area: Infra/Node Lifecycle
4+
type: bug
5+
issues:
6+
- 100201

server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.HashSet;
4343
import java.util.List;
4444
import java.util.Map;
45-
import java.util.Objects;
4645
import java.util.Set;
4746
import java.util.concurrent.atomic.AtomicBoolean;
4847
import java.util.concurrent.atomic.AtomicLong;
@@ -368,9 +367,12 @@ public void onNewInfo(ClusterInfo info) {
368367
}
369368

370369
// Generate a map of node name to ID so we can use it to look up node replacement targets
371-
final Map<String, String> nodeNameToId = state.getRoutingNodes()
370+
final Map<String, List<String>> nodeNameToIds = state.getRoutingNodes()
372371
.stream()
373-
.collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2));
372+
.collect(Collectors.groupingBy(rn -> rn.node().getName(), Collectors.mapping(RoutingNode::nodeId, Collectors.toList())));
373+
374+
// Generate a set of the valid node IDs so we can use it to filter valid sources
375+
final Set<String> routingNodeIds = state.getRoutingNodes().stream().map(RoutingNode::nodeId).collect(Collectors.toSet());
374376

375377
// Calculate both the source node id and the target node id of a "replace" type shutdown
376378
final Set<String> nodesIdsPartOfReplacement = state.metadata()
@@ -379,8 +381,8 @@ public void onNewInfo(ClusterInfo info) {
379381
.values()
380382
.stream()
381383
.filter(meta -> meta.getType() == SingleNodeShutdownMetadata.Type.REPLACE)
382-
.flatMap(meta -> Stream.of(meta.getNodeId(), nodeNameToId.get(meta.getTargetNodeName())))
383-
.filter(Objects::nonNull) // The REPLACE target node might not still be in RoutingNodes
384+
.flatMap(meta -> Stream.concat(Stream.of(meta.getNodeId()), nodeIdsOrEmpty(meta, nodeNameToIds)))
385+
.filter(routingNodeIds::contains) // The REPLACE source node might already have been removed from RoutingNodes
384386
.collect(Collectors.toSet());
385387

386388
// Generate a set of all the indices that exist on either the target or source of a node replacement
@@ -420,6 +422,11 @@ public void onNewInfo(ClusterInfo info) {
420422
}
421423
}
422424

425+
private static Stream<String> nodeIdsOrEmpty(SingleNodeShutdownMetadata meta, Map<String, List<String>> nodeNameToIds) {
426+
var ids = nodeNameToIds.get(meta.getTargetNodeName()); // The REPLACE target node might not still be in RoutingNodes
427+
return ids == null ? Stream.empty() : ids.stream();
428+
}
429+
423430
// exposed for tests to override
424431
long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) {
425432
return DiskThresholdDecider.sizeOfUnaccountedShards(

server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1337,6 +1337,76 @@ public void testSkipDiskThresholdMonitorWhenStateNotRecovered() {
13371337
assertNull(result2.v2());
13381338
}
13391339

1340+
private void doTestSkipNodesNotInRoutingTable(boolean sourceNodeInTable, boolean targetNodeInTable) {
1341+
final var projectId = randomProjectIdOrDefault();
1342+
final Metadata.Builder metadataBuilder = Metadata.builder()
1343+
.put(
1344+
ProjectMetadata.builder(projectId)
1345+
.put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(1).numberOfReplicas(1))
1346+
.build()
1347+
);
1348+
1349+
metadataBuilder.putCustom(
1350+
NodesShutdownMetadata.TYPE,
1351+
new NodesShutdownMetadata(
1352+
Collections.singletonMap(
1353+
"node1",
1354+
SingleNodeShutdownMetadata.builder()
1355+
.setNodeId("node1")
1356+
.setNodeEphemeralId("node1")
1357+
.setReason("testing")
1358+
.setType(SingleNodeShutdownMetadata.Type.REPLACE)
1359+
.setTargetNodeName("node3")
1360+
.setStartedAtMillis(randomNonNegativeLong())
1361+
.build()
1362+
)
1363+
)
1364+
);
1365+
1366+
final Metadata metadata = metadataBuilder.build();
1367+
final RoutingTable routingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY)
1368+
.addAsNew(metadata.getProject(projectId).index("test"))
1369+
.build();
1370+
DiscoveryNodes.Builder discoveryNodes = DiscoveryNodes.builder().add(newNormalNode("node2", "node2"));
1371+
// node1 which is replaced by node3 may or may not be in the cluster
1372+
if (sourceNodeInTable) {
1373+
discoveryNodes.add(newNormalNode("node1", "node1"));
1374+
}
1375+
// node3 which is to replace node1 may or may not be in the cluster
1376+
if (targetNodeInTable) {
1377+
discoveryNodes.add(newNormalNode("node3", "node3"));
1378+
}
1379+
final ClusterState clusterState = applyStartedShardsUntilNoChange(
1380+
ClusterState.builder(ClusterName.DEFAULT)
1381+
.metadata(metadata)
1382+
.routingTable(GlobalRoutingTable.builder().put(projectId, routingTable).build())
1383+
.nodes(discoveryNodes)
1384+
.build(),
1385+
createAllocationService(Settings.EMPTY)
1386+
);
1387+
final Index testIndex = routingTable.index("test").getIndex();
1388+
1389+
Map<String, DiskUsage> diskUsages = new HashMap<>();
1390+
diskUsages.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
1391+
diskUsages.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
1392+
final ClusterInfo clusterInfo = clusterInfo(diskUsages);
1393+
Tuple<Boolean, Set<Index>> result = runDiskThresholdMonitor(clusterState, clusterInfo);
1394+
assertTrue(result.v1()); // reroute on new nodes
1395+
assertThat(result.v2(), contains(testIndex));
1396+
}
1397+
1398+
public void testSkipReplaceSourceNodeNotInRoutingTable() {
1399+
doTestSkipNodesNotInRoutingTable(false, true);
1400+
}
1401+
1402+
public void testSkipReplaceTargetNodeNotInRoutingTable() {
1403+
doTestSkipNodesNotInRoutingTable(true, false);
1404+
}
1405+
1406+
public void testSkipReplaceSourceAndTargetNodesNotInRoutingTable() {
1407+
doTestSkipNodesNotInRoutingTable(false, false);
1408+
}
1409+
13401410
// Runs a disk threshold monitor with a given cluster state and cluster info and returns whether a reroute should
13411411
// happen and any indices that should be marked as read-only.
13421412
private Tuple<Boolean, Set<String>> runDiskThresholdMonitor(ClusterState clusterState, ClusterInfo clusterInfo) {

0 commit comments

Comments
 (0)