|
29 | 29 | import org.elasticsearch.common.Priority; |
30 | 30 | import org.elasticsearch.common.Strings; |
31 | 31 | import org.elasticsearch.features.FeatureService; |
| 32 | +import org.elasticsearch.features.NodeFeature; |
32 | 33 | import org.elasticsearch.index.IndexVersion; |
33 | 34 | import org.elasticsearch.index.IndexVersions; |
34 | 35 | import org.elasticsearch.persistent.PersistentTasksCustomMetadata; |
|
39 | 40 | import java.util.Comparator; |
40 | 41 | import java.util.HashMap; |
41 | 42 | import java.util.HashSet; |
| 43 | +import java.util.Iterator; |
42 | 44 | import java.util.List; |
43 | 45 | import java.util.Map; |
44 | 46 | import java.util.Objects; |
@@ -137,8 +139,8 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex |
137 | 139 |
|
138 | 140 | DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes()); |
139 | 141 | Map<String, CompatibilityVersions> compatibilityVersionsMap = new HashMap<>(newState.compatibilityVersions()); |
140 | | - Map<String, Set<String>> nodeFeatures = new HashMap<>(newState.nodeFeatures()); |
141 | | - Set<String> allNodesFeatures = ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values()); |
| 142 | + Map<String, Set<String>> nodeFeatures = new HashMap<>(newState.nodeFeatures()); // as present in cluster state |
| 143 | + Set<String> effectiveClusterFeatures = calculateEffectiveClusterFeatures(newState.nodes(), nodeFeatures); |
142 | 144 |
|
143 | 145 | assert nodesBuilder.isLocalNodeElectedMaster(); |
144 | 146 |
|
@@ -174,14 +176,17 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex |
174 | 176 | } |
175 | 177 | blockForbiddenVersions(compatibilityVersions.transportVersion()); |
176 | 178 | ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion); |
177 | | - enforceNodeFeatureBarrier(node.getId(), allNodesFeatures, features); |
| 179 | + Set<String> newNodeEffectiveFeatures = enforceNodeFeatureBarrier(node, effectiveClusterFeatures, features); |
178 | 180 | // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices |
179 | 181 | // we have to reject nodes that don't support all indices we have in this cluster |
180 | 182 | ensureIndexCompatibility(node.getMinIndexVersion(), node.getMaxIndexVersion(), initialState.getMetadata()); |
| 183 | + |
181 | 184 | nodesBuilder.add(node); |
182 | 185 | compatibilityVersionsMap.put(node.getId(), compatibilityVersions); |
| 186 | + // store the actual node features here, not including assumed features, as this is persisted in cluster state |
183 | 187 | nodeFeatures.put(node.getId(), features); |
184 | | - allNodesFeatures.retainAll(features); |
| 188 | + effectiveClusterFeatures.retainAll(newNodeEffectiveFeatures); |
| 189 | + |
185 | 190 | nodesChanged = true; |
186 | 191 | minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); |
187 | 192 | maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion()); |
@@ -355,6 +360,35 @@ private static void blockForbiddenVersions(TransportVersion joiningTransportVers |
355 | 360 | } |
356 | 361 | } |
357 | 362 |
|
| 363 | + /** |
| 364 | + * Calculate the cluster's effective features. This includes all features that are assumed on any nodes in the cluster, |
| 365 | + * that are also present across the whole cluster as a result. |
| 366 | + */ |
| 367 | + private Set<String> calculateEffectiveClusterFeatures(DiscoveryNodes nodes, Map<String, Set<String>> nodeFeatures) { |
| 368 | + if (featureService.featuresCanBeAssumedForNodes(nodes)) { |
| 369 | + Set<String> assumedFeatures = featureService.getNodeFeatures() |
| 370 | + .values() |
| 371 | + .stream() |
| 372 | + .filter(NodeFeature::assumedAfterNextCompatibilityBoundary) |
| 373 | + .map(NodeFeature::id) |
| 374 | + .collect(Collectors.toSet()); |
| 375 | + |
| 376 | + // add all assumed features to the featureset of all nodes of the next major version |
| 377 | + nodeFeatures = new HashMap<>(nodeFeatures); |
| 378 | + for (var node : nodes.getNodes().entrySet()) { |
| 379 | + if (featureService.featuresCanBeAssumedForNode(node.getValue())) { |
| 380 | + assert nodeFeatures.containsKey(node.getKey()) : "Node " + node.getKey() + " does not have any features"; |
| 381 | + nodeFeatures.computeIfPresent(node.getKey(), (k, v) -> { |
| 382 | + var newFeatures = new HashSet<>(v); |
| 383 | + return newFeatures.addAll(assumedFeatures) ? newFeatures : v; |
| 384 | + }); |
| 385 | + } |
| 386 | + } |
| 387 | + } |
| 388 | + |
| 389 | + return ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values()); |
| 390 | + } |
| 391 | + |
358 | 392 | /** |
359 | 393 | * Ensures that all indices are compatible with the given index version. This will ensure that all indices in the given metadata |
360 | 394 | * will not be created with a newer version of elasticsearch as well as that all indices are newer or equal to the minimum index |
@@ -461,13 +495,44 @@ public static void ensureVersionBarrier(Version joiningNodeVersion, Version minC |
461 | 495 | } |
462 | 496 | } |
463 | 497 |
|
464 | | - private void enforceNodeFeatureBarrier(String nodeId, Set<String> existingNodesFeatures, Set<String> newNodeFeatures) { |
| 498 | + /** |
| 499 | + * Enforces the feature join barrier - a joining node should have all features already present in all existing nodes in the cluster |
| 500 | + * |
| 501 | + * @return The set of features that this node has (including assumed features) |
| 502 | + */ |
| 503 | + private Set<String> enforceNodeFeatureBarrier(DiscoveryNode node, Set<String> effectiveClusterFeatures, Set<String> newNodeFeatures) { |
465 | 504 | // prevent join if it does not have one or more features that all other nodes have |
466 | | - Set<String> missingFeatures = new HashSet<>(existingNodesFeatures); |
| 505 | + Set<String> missingFeatures = new HashSet<>(effectiveClusterFeatures); |
467 | 506 | missingFeatures.removeAll(newNodeFeatures); |
468 | 507 |
|
469 | | - if (missingFeatures.isEmpty() == false) { |
470 | | - throw new IllegalStateException("Node " + nodeId + " is missing required features " + missingFeatures); |
| 508 | + if (missingFeatures.isEmpty()) { |
| 509 | + // nothing missing - all ok |
| 510 | + return newNodeFeatures; |
| 511 | + } |
| 512 | + |
| 513 | + if (featureService.featuresCanBeAssumedForNode(node)) { |
| 514 | + // it might still be ok for this node to join if this node can have assumed features, |
| 515 | + // and all the missing features are assumed |
| 516 | + // we can get the NodeFeature object direct from this node's registered features |
| 517 | + // as all existing nodes in the cluster have the features present in existingNodesFeatures, including this one |
| 518 | + newNodeFeatures = new HashSet<>(newNodeFeatures); |
| 519 | + for (Iterator<String> it = missingFeatures.iterator(); it.hasNext();) { |
| 520 | + String feature = it.next(); |
| 521 | + NodeFeature nf = featureService.getNodeFeatures().get(feature); |
| 522 | + if (nf.assumedAfterNextCompatibilityBoundary()) { |
| 523 | + // its ok for this feature to be missing from this node |
| 524 | + it.remove(); |
| 525 | + // and it should be assumed to still be in the cluster |
| 526 | + newNodeFeatures.add(feature); |
| 527 | + } |
| 528 | + // even if we don't remove it, still continue, so the exception message below is accurate |
| 529 | + } |
| 530 | + } |
| 531 | + |
| 532 | + if (missingFeatures.isEmpty()) { |
| 533 | + return newNodeFeatures; |
| 534 | + } else { |
| 535 | + throw new IllegalStateException("Node " + node.getId() + " is missing required features " + missingFeatures); |
471 | 536 | } |
472 | 537 | } |
473 | 538 |
|
|
0 commit comments