Skip to content

Commit db8c62b

Browse files
authored
[8.19] Better handling of node ids from shutdown metadata (elastic#128298) (elastic#128364)
* Better handling of node ids from shutdown metadata (elastic#128298) Currently, the DiskThresholdMonitor code considers shutdown metadata to identify nodes that are being replaced. If the node-to-be-replaced (source) leaves the cluster before the corresponding shutdown metadata is removed from the cluster state, we can have a NPE. This PR adds a test for that and improves a bit code to handle node ids for source and target from shutdown metadata. Fixes elastic#100201 * remove project stuff from backport
1 parent ab89b87 commit db8c62b

File tree

3 files changed

+79
-5
lines changed

3 files changed

+79
-5
lines changed

docs/changelog/128298.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 128298
2+
summary: Better handling of node ids from shutdown metadata (avoid NPE on already removed nodes)
3+
area: Infra/Node Lifecycle
4+
type: bug
5+
issues:
6+
- 100201

server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.HashSet;
4343
import java.util.List;
4444
import java.util.Map;
45-
import java.util.Objects;
4645
import java.util.Set;
4746
import java.util.concurrent.atomic.AtomicBoolean;
4847
import java.util.concurrent.atomic.AtomicLong;
@@ -368,9 +367,12 @@ public void onNewInfo(ClusterInfo info) {
368367
}
369368

370369
// Generate a map of node name to ID so we can use it to look up node replacement targets
371-
final Map<String, String> nodeNameToId = state.getRoutingNodes()
370+
final Map<String, List<String>> nodeNameToIds = state.getRoutingNodes()
372371
.stream()
373-
.collect(Collectors.toMap(rn -> rn.node().getName(), RoutingNode::nodeId, (s1, s2) -> s2));
372+
.collect(Collectors.groupingBy(rn -> rn.node().getName(), Collectors.mapping(RoutingNode::nodeId, Collectors.toList())));
373+
374+
// Generate a set of the valid node IDs so we can use it to filter valid sources
375+
final Set<String> routingNodeIds = state.getRoutingNodes().stream().map(RoutingNode::nodeId).collect(Collectors.toSet());
374376

375377
// Calculate both the source node id and the target node id of a "replace" type shutdown
376378
final Set<String> nodesIdsPartOfReplacement = state.metadata()
@@ -379,8 +381,8 @@ public void onNewInfo(ClusterInfo info) {
379381
.values()
380382
.stream()
381383
.filter(meta -> meta.getType() == SingleNodeShutdownMetadata.Type.REPLACE)
382-
.flatMap(meta -> Stream.of(meta.getNodeId(), nodeNameToId.get(meta.getTargetNodeName())))
383-
.filter(Objects::nonNull) // The REPLACE target node might not still be in RoutingNodes
384+
.flatMap(meta -> Stream.concat(Stream.of(meta.getNodeId()), nodeIdsOrEmpty(meta, nodeNameToIds)))
385+
.filter(routingNodeIds::contains) // The REPLACE source node might already have been removed from RoutingNodes
384386
.collect(Collectors.toSet());
385387

386388
// Generate a set of all the indices that exist on either the target or source of a node replacement
@@ -420,6 +422,11 @@ public void onNewInfo(ClusterInfo info) {
420422
}
421423
}
422424

425+
private static Stream<String> nodeIdsOrEmpty(SingleNodeShutdownMetadata meta, Map<String, List<String>> nodeNameToIds) {
426+
var ids = nodeNameToIds.get(meta.getTargetNodeName()); // The REPLACE target node might not still be in RoutingNodes
427+
return ids == null ? Stream.empty() : ids.stream();
428+
}
429+
423430
// exposed for tests to override
424431
long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) {
425432
return DiskThresholdDecider.sizeOfUnaccountedShards(

server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.core.Releasable;
3838
import org.elasticsearch.core.Tuple;
3939
import org.elasticsearch.gateway.GatewayService;
40+
import org.elasticsearch.index.Index;
4041
import org.elasticsearch.index.IndexVersion;
4142
import org.elasticsearch.index.shard.ShardId;
4243
import org.elasticsearch.test.MockLog;
@@ -1337,6 +1338,66 @@ public void testSkipDiskThresholdMonitorWhenStateNotRecovered() {
13371338
assertNull(result2.v2());
13381339
}
13391340

1341+
private void doTestSkipNodesNotInRoutingTable(boolean sourceNodeInTable, boolean targetNodeInTable) {
1342+
Metadata.Builder metadataBuilder = Metadata.builder()
1343+
.put(IndexMetadata.builder("test").settings(settings(IndexVersion.current())).numberOfShards(1).numberOfReplicas(1))
1344+
.putCustom(
1345+
NodesShutdownMetadata.TYPE,
1346+
new NodesShutdownMetadata(
1347+
Collections.singletonMap(
1348+
"node1",
1349+
SingleNodeShutdownMetadata.builder()
1350+
.setNodeId("node1")
1351+
.setNodeEphemeralId("node1")
1352+
.setReason("testing")
1353+
.setType(SingleNodeShutdownMetadata.Type.REPLACE)
1354+
.setTargetNodeName("node3")
1355+
.setStartedAtMillis(randomNonNegativeLong())
1356+
.build()
1357+
)
1358+
)
1359+
);
1360+
1361+
final Metadata metadata = metadataBuilder.build();
1362+
final RoutingTable routingTable = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY)
1363+
.addAsNew(metadata.index("test"))
1364+
.build();
1365+
DiscoveryNodes.Builder discoveryNodes = DiscoveryNodes.builder().add(newNormalNode("node2", "node2"));
1366+
// node1 which is replaced by node3 may or may not be in the cluster
1367+
if (sourceNodeInTable) {
1368+
discoveryNodes.add(newNormalNode("node1", "node1"));
1369+
}
1370+
// node3 which is to replace node1 may or may not be in the cluster
1371+
if (targetNodeInTable) {
1372+
discoveryNodes.add(newNormalNode("node3", "node3"));
1373+
}
1374+
final ClusterState clusterState = applyStartedShardsUntilNoChange(
1375+
ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(routingTable).nodes(discoveryNodes).build(),
1376+
createAllocationService(Settings.EMPTY)
1377+
);
1378+
final Index testIndex = routingTable.index("test").getIndex();
1379+
1380+
Map<String, DiskUsage> diskUsages = new HashMap<>();
1381+
diskUsages.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4)));
1382+
diskUsages.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4)));
1383+
final ClusterInfo clusterInfo = clusterInfo(diskUsages);
1384+
Tuple<Boolean, Set<String>> result = runDiskThresholdMonitor(clusterState, clusterInfo);
1385+
assertTrue(result.v1()); // reroute on new nodes
1386+
assertEquals(Set.of("test"), result.v2());
1387+
}
1388+
1389+
public void testSkipReplaceSourceNodeNotInRoutingTable() {
1390+
doTestSkipNodesNotInRoutingTable(false, true);
1391+
}
1392+
1393+
public void testSkipReplaceTargetNodeNotInRoutingTable() {
1394+
doTestSkipNodesNotInRoutingTable(true, false);
1395+
}
1396+
1397+
public void testSkipReplaceSourceAndTargetNodesNotInRoutingTable() {
1398+
doTestSkipNodesNotInRoutingTable(false, false);
1399+
}
1400+
13401401
// Runs a disk threshold monitor with a given cluster state and cluster info and returns whether a reroute should
13411402
// happen and any indices that should be marked as read-only.
13421403
private Tuple<Boolean, Set<String>> runDiskThresholdMonitor(ClusterState clusterState, ClusterInfo clusterInfo) {

0 commit comments

Comments
 (0)