Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/128298.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 128298
summary: Better handling of node ids from shutdown metadata (avoid NPE on already removed nodes)
area: Infra/Node Lifecycle
type: bug
issues:
- 100201
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -376,9 +375,12 @@ public void onNewInfo(ClusterInfo info) {
}

// Generate a map of node name to ID so we can use it to look up node replacement targets
final Map<String, String> nodeNameToId = state.getRoutingNodes()
final Map<String, List<String>> nodeNameToIds = state.getRoutingNodes()
.stream()
.collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2));
.collect(Collectors.groupingBy(rn -> rn.node().getName(), Collectors.mapping(RoutingNode::nodeId, Collectors.toList())));

// Generate a set of the valid node IDs so we can use it to filter valid sources
final Set<String> routingNodeIds = state.getRoutingNodes().stream().map(RoutingNode::nodeId).collect(Collectors.toSet());

// Calculate both the source node id and the target node id of a "replace" type shutdown
final Set<String> nodesIdsPartOfReplacement = state.metadata()
Expand All @@ -387,8 +389,8 @@ public void onNewInfo(ClusterInfo info) {
.values()
.stream()
.filter(meta -> meta.getType() == SingleNodeShutdownMetadata.Type.REPLACE)
.flatMap(meta -> Stream.of(meta.getNodeId(), nodeNameToId.get(meta.getTargetNodeName())))
.filter(Objects::nonNull) // The REPLACE target node might not still be in RoutingNodes
.flatMap(meta -> Stream.concat(Stream.of(meta.getNodeId()), nodeIdsOrEmpty(meta, nodeNameToIds)))
.filter(routingNodeIds::contains) // The REPLACE source node might already have been removed from RoutingNodes
.collect(Collectors.toSet());

// Generate a set of all the indices that exist on either the target or source of a node replacement
Expand Down Expand Up @@ -437,6 +439,11 @@ public void onNewInfo(ClusterInfo info) {
}
}

private static Stream<String> nodeIdsOrEmpty(SingleNodeShutdownMetadata meta, Map<String, List<String>> nodeNameToIds) {
var ids = nodeNameToIds.get(meta.getTargetNodeName()); // The REPLACE target node might not still be in RoutingNodes
return ids == null ? Stream.empty() : ids.stream();
}

// exposed for tests to override
long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) {
return DiskThresholdDecider.sizeOfUnaccountedShards(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,78 @@ public void testSkipDiskThresholdMonitorWhenStateNotRecovered() {
assertNull(result2.v2());
}

private void doTestSkipNodesNotInRoutingTable(boolean sourceNodeInTable, boolean targetNodeInTable) {
final var projectId = randomProjectIdOrDefault();
final Metadata.Builder metadataBuilder = Metadata.builder()
.put(
ProjectMetadata.builder(projectId)
.put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(1).numberOfReplicas(1))
.build()
);

metadataBuilder.putCustom(
NodesShutdownMetadata.TYPE,
new NodesShutdownMetadata(
Collections.singletonMap(
"node1",
SingleNodeShutdownMetadata.builder()
.setNodeId("node1")
.setNodeEphemeralId("node1")
.setReason("testing")
.setType(SingleNodeShutdownMetadata.Type.REPLACE)
.setTargetNodeName("node3")
.setStartedAtMillis(randomNonNegativeLong())
.build()
)
)
);

final Metadata metadata = metadataBuilder.build();
final RoutingTable routingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY)
.addAsNew(metadata.getProject(projectId).index("test"))
.build();
DiscoveryNodes.Builder discoveryNodes = DiscoveryNodes.builder()
// .add(newNormalNode("node1", "node1"))
.add(newNormalNode("node2", "node2"));
// node1 which is replaced by node1 may or may not be in the cluster
if (sourceNodeInTable) {
discoveryNodes.add(newNormalNode("node1", "node1"));
}
// node3 which is to replace node1 may or may not be in the cluster
if (targetNodeInTable) {
discoveryNodes.add(newNormalNode("node3", "node3"));
}
final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
.routingTable(GlobalRoutingTable.builder().put(projectId, routingTable).build())
.nodes(discoveryNodes)
.build(),
createAllocationService(Settings.EMPTY)
);
final Index testIndex = routingTable.index("test").getIndex();

Map<String, DiskUsage> diskUsages = new HashMap<>();
diskUsages.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
diskUsages.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
final ClusterInfo clusterInfo = clusterInfo(diskUsages);
Tuple<Boolean, Set<Index>> result = runDiskThresholdMonitor(clusterState, clusterInfo);
assertTrue(result.v1()); // reroute on new nodes
assertThat(result.v2(), contains(testIndex));
}

public void testSkipReplaceSourceNodeNotInRoutingTable() {
doTestSkipNodesNotInRoutingTable(false, true);
}

public void testSkipReplaceTargetNodeNotInRoutingTable() {
doTestSkipNodesNotInRoutingTable(true, false);
}

public void testSkipReplaceSourceAndTargetNodesNotInRoutingTable() {
doTestSkipNodesNotInRoutingTable(false, false);
}

// Runs a disk threshold monitor with a given cluster state and cluster info and returns whether a reroute should
// happen and any indices that should be marked as read-only.
private Tuple<Boolean, Set<Index>> runDiskThresholdMonitor(ClusterState clusterState, ClusterInfo clusterInfo) {
Expand Down
Loading