Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/128298.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 128298
summary: Better handling of node ids from shutdown metadata (avoid NPE on already removed nodes)
area: Infra/Node Lifecycle
type: bug
issues:
- 100201
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -368,9 +367,12 @@ public void onNewInfo(ClusterInfo info) {
}

// Generate a map of node name to ID so we can use it to look up node replacement targets
final Map<String, String> nodeNameToId = state.getRoutingNodes()
final Map<String, List<String>> nodeNameToIds = state.getRoutingNodes()
.stream()
.collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2));
.collect(Collectors.groupingBy(rn -> rn.node().getName(), Collectors.mapping(RoutingNode::nodeId, Collectors.toList())));

// Generate a set of the valid node IDs so we can use it to filter valid sources
final Set<String> routingNodeIds = state.getRoutingNodes().stream().map(RoutingNode::nodeId).collect(Collectors.toSet());

// Calculate both the source node id and the target node id of a "replace" type shutdown
final Set<String> nodesIdsPartOfReplacement = state.metadata()
Expand All @@ -379,8 +381,8 @@ public void onNewInfo(ClusterInfo info) {
.values()
.stream()
.filter(meta -> meta.getType() == SingleNodeShutdownMetadata.Type.REPLACE)
.flatMap(meta -> Stream.of(meta.getNodeId(), nodeNameToId.get(meta.getTargetNodeName())))
.filter(Objects::nonNull) // The REPLACE target node might not still be in RoutingNodes
.flatMap(meta -> Stream.concat(Stream.of(meta.getNodeId()), nodeIdsOrEmpty(meta, nodeNameToIds)))
.filter(routingNodeIds::contains) // The REPLACE source node might already have been removed from RoutingNodes
.collect(Collectors.toSet());

// Generate a set of all the indices that exist on either the target or source of a node replacement
Expand Down Expand Up @@ -420,6 +422,11 @@ public void onNewInfo(ClusterInfo info) {
}
}

private static Stream<String> nodeIdsOrEmpty(SingleNodeShutdownMetadata meta, Map<String, List<String>> nodeNameToIds) {
var ids = nodeNameToIds.get(meta.getTargetNodeName()); // The REPLACE target node might not still be in RoutingNodes
return ids == null ? Stream.empty() : ids.stream();
}

// exposed for tests to override
long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) {
return DiskThresholdDecider.sizeOfUnaccountedShards(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.MockLog;
Expand Down Expand Up @@ -1337,6 +1338,66 @@ public void testSkipDiskThresholdMonitorWhenStateNotRecovered() {
assertNull(result2.v2());
}

private void doTestSkipNodesNotInRoutingTable(boolean sourceNodeInTable, boolean targetNodeInTable) {
Metadata.Builder metadataBuilder = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(1).numberOfReplicas(1))
.putCustom(
NodesShutdownMetadata.TYPE,
new NodesShutdownMetadata(
Collections.singletonMap(
"node1",
SingleNodeShutdownMetadata.builder()
.setNodeId("node1")
.setNodeEphemeralId("node1")
.setReason("testing")
.setType(SingleNodeShutdownMetadata.Type.REPLACE)
.setTargetNodeName("node3")
.setStartedAtMillis(randomNonNegativeLong())
.build()
)
)
);

final Metadata metadata = metadataBuilder.build();
final RoutingTable routingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY)
.addAsNew(metadata.index("test"))
.build();
DiscoveryNodes.Builder discoveryNodes = DiscoveryNodes.builder().add(newNormalNode("node2", "node2"));
// node1 which is replaced by node3 may or may not be in the cluster
if (sourceNodeInTable) {
discoveryNodes.add(newNormalNode("node1", "node1"));
}
// node3 which is to replace node1 may or may not be in the cluster
if (targetNodeInTable) {
discoveryNodes.add(newNormalNode("node3", "node3"));
}
final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(routingTable).nodes(discoveryNodes).build(),
createAllocationService(Settings.EMPTY)
);
final Index testIndex = routingTable.index("test").getIndex();

Map<String, DiskUsage> diskUsages = new HashMap<>();
diskUsages.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
diskUsages.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
final ClusterInfo clusterInfo = clusterInfo(diskUsages);
Tuple<Boolean, Set<String>> result = runDiskThresholdMonitor(clusterState, clusterInfo);
assertTrue(result.v1()); // reroute on new nodes
assertEquals(Set.of("test"), result.v2());
}

public void testSkipReplaceSourceNodeNotInRoutingTable() {
doTestSkipNodesNotInRoutingTable(false, true);
}

public void testSkipReplaceTargetNodeNotInRoutingTable() {
doTestSkipNodesNotInRoutingTable(true, false);
}

public void testSkipReplaceSourceAndTargetNodesNotInRoutingTable() {
doTestSkipNodesNotInRoutingTable(false, false);
}

// Runs a disk threshold monitor with a given cluster state and cluster info and returns whether a reroute should
// happen and any indices that should be marked as read-only.
private Tuple<Boolean, Set<String>> runDiskThresholdMonitor(ClusterState clusterState, ClusterInfo clusterInfo) {
Expand Down