diff --git a/docs/changelog/128298.yaml b/docs/changelog/128298.yaml new file mode 100644 index 0000000000000..bfa16cb509304 --- /dev/null +++ b/docs/changelog/128298.yaml @@ -0,0 +1,6 @@ +pr: 128298 +summary: Better handling of node ids from shutdown metadata (avoid NPE on already removed nodes) +area: Infra/Node Lifecycle +type: bug +issues: + - 100201 diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 0727e9aa62e2e..eb9175ff12fb1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -42,7 +42,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -368,9 +367,12 @@ public void onNewInfo(ClusterInfo info) { } // Generate a map of node name to ID so we can use it to look up node replacement targets - final Map nodeNameToId = state.getRoutingNodes() + final Map> nodeNameToIds = state.getRoutingNodes() .stream() - .collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2)); + .collect(Collectors.groupingBy(rn -> rn.node().getName(), Collectors.mapping(RoutingNode::nodeId, Collectors.toList()))); + + // Generate a set of the valid node IDs so we can use it to filter valid sources + final Set routingNodeIds = state.getRoutingNodes().stream().map(RoutingNode::nodeId).collect(Collectors.toSet()); // Calculate both the source node id and the target node id of a "replace" type shutdown final Set nodesIdsPartOfReplacement = state.metadata() @@ -379,8 +381,8 @@ public void onNewInfo(ClusterInfo info) { .values() .stream() .filter(meta -> meta.getType() == SingleNodeShutdownMetadata.Type.REPLACE) - .flatMap(meta -> Stream.of(meta.getNodeId(), nodeNameToId.get(meta.getTargetNodeName()))) - .filter(Objects::nonNull) // The REPLACE target node might not still be in RoutingNodes + .flatMap(meta -> Stream.concat(Stream.of(meta.getNodeId()), nodeIdsOrEmpty(meta, nodeNameToIds))) + .filter(routingNodeIds::contains) // The REPLACE source node might already have been removed from RoutingNodes .collect(Collectors.toSet()); // Generate a set of all the indices that exist on either the target or source of a node replacement @@ -420,6 +422,11 @@ public void onNewInfo(ClusterInfo info) { } } + private static Stream nodeIdsOrEmpty(SingleNodeShutdownMetadata meta, Map> nodeNameToIds) { + var ids = nodeNameToIds.get(meta.getTargetNodeName()); // The REPLACE target node might not still be in RoutingNodes + return ids == null ? Stream.empty() : ids.stream(); + } + // exposed for tests to override long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) { return DiskThresholdDecider.sizeOfUnaccountedShards( diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 5a6e8e65495a0..61028d9080a90 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.MockLog; @@ -1337,6 +1338,66 @@ public void testSkipDiskThresholdMonitorWhenStateNotRecovered() { assertNull(result2.v2()); } + private void doTestSkipNodesNotInRoutingTable(boolean sourceNodeInTable, boolean targetNodeInTable) { + Metadata.Builder metadataBuilder = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(1).numberOfReplicas(1)) + .putCustom( + NodesShutdownMetadata.TYPE, + new NodesShutdownMetadata( + Collections.singletonMap( + "node1", + SingleNodeShutdownMetadata.builder() + .setNodeId("node1") + .setNodeEphemeralId("node1") + .setReason("testing") + .setType(SingleNodeShutdownMetadata.Type.REPLACE) + .setTargetNodeName("node3") + .setStartedAtMillis(randomNonNegativeLong()) + .build() + ) + ) + ); + + final Metadata metadata = metadataBuilder.build(); + final RoutingTable routingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY) + .addAsNew(metadata.index("test")) + .build(); + DiscoveryNodes.Builder discoveryNodes = DiscoveryNodes.builder().add(newNormalNode("node2", "node2")); + // node1 which is replaced by node3 may or may not be in the cluster + if (sourceNodeInTable) { + discoveryNodes.add(newNormalNode("node1", "node1")); + } + // node3 which is to replace node1 may or may not be in the cluster + if (targetNodeInTable) { + discoveryNodes.add(newNormalNode("node3", "node3")); + } + final ClusterState clusterState = applyStartedShardsUntilNoChange( + ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(routingTable).nodes(discoveryNodes).build(), + createAllocationService(Settings.EMPTY) + ); + final Index testIndex = routingTable.index("test").getIndex(); + + Map diskUsages = new HashMap<>(); + diskUsages.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4))); + diskUsages.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4))); + final ClusterInfo clusterInfo = clusterInfo(diskUsages); + Tuple> result = runDiskThresholdMonitor(clusterState, clusterInfo); + assertTrue(result.v1()); // reroute on new nodes + assertEquals(Set.of("test"), result.v2()); + } + + public void testSkipReplaceSourceNodeNotInRoutingTable() { + doTestSkipNodesNotInRoutingTable(false, true); + } + + public void testSkipReplaceTargetNodeNotInRoutingTable() { + doTestSkipNodesNotInRoutingTable(true, false); + } + + public void testSkipReplaceSourceAndTargetNodesNotInRoutingTable() { + doTestSkipNodesNotInRoutingTable(false, false); + } + // Runs a disk threshold monitor with a given cluster state and cluster info and returns whether a reroute should // happen and any indices that should be marked as read-only. private Tuple> runDiskThresholdMonitor(ClusterState clusterState, ClusterInfo clusterInfo) {