diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/NodeJoiningIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/NodeJoiningIT.java new file mode 100644 index 0000000000000..9fdde19ac3c33 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/NodeJoiningIT.java @@ -0,0 +1,258 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.coordination; + +import org.apache.logging.log4j.Level; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportService; + +import java.util.Collection; +import java.util.List; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +public class NodeJoiningIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + // detect leader failover quickly + .put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) + .put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "100ms") + .build(); + } + + public void testNodeJoinsCluster() { + internalCluster().startNodes(3); + String masterNodeName = internalCluster().getMasterName(); + int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size(); + int numberOfMasterNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName) + .state() + .nodes() + .getMasterNodes() + .size(); + List namesOfDataNodesInOriginalCluster = getListOfDataNodeNamesFromCluster(masterNodeName); + + // Attempt to add new node + String newNodeName = internalCluster().startDataOnlyNode(); + ensureStableCluster(4); + + // Assert the new data node was added + ClusterState state = internalCluster().clusterService(masterNodeName).state(); + assertEquals(numberOfNodesOriginallyInCluster + 1, state.nodes().getSize()); + assertEquals(namesOfDataNodesInOriginalCluster.size() + 1, state.nodes().getDataNodes().size()); + assertEquals(numberOfMasterNodesOriginallyInCluster, state.nodes().getMasterNodes().size()); + + List namesOfDataNodesInNewCluster = getListOfDataNodeNamesFromCluster(masterNodeName); + assertTrue(namesOfDataNodesInNewCluster.contains(newNodeName)); + for (String nodeName : namesOfDataNodesInOriginalCluster) { + assertTrue(namesOfDataNodesInNewCluster.contains(nodeName)); + } + } + + @TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination.NodeJoinExecutor:INFO") + public void testNodeTriesToJoinClusterAndThenDifferentMasterIsElected() { + List nodeNames = internalCluster().startNodes(3); + ensureStableCluster(3); + String originalMasterNodeName = internalCluster().getMasterName(); + int numberOfNodesOriginallyInCluster = internalCluster().clusterService(originalMasterNodeName).state().getNodes().size(); + // Determine upfront who we want the next master to be + final var newMasterNodeName = randomValueOtherThan(originalMasterNodeName, () -> randomFrom(nodeNames)); + + // Ensure the logging is as expected + try (var mockLog = MockLog.capture(NodeJoinExecutor.class)) { + + // Sets MockTransportService behaviour + for (final var transportService : internalCluster().getInstances(TransportService.class)) { + final var mockTransportService = asInstanceOf(MockTransportService.class, transportService); + + if (mockTransportService.getLocalNode().getName().equals(newMasterNodeName) == false) { + List listOfActionsToBlock = List.of( + // This forces the current master node to fail + PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME, + // This disables pre-voting on all nodes except the new master, forcing it to win the election + StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME + ); + blockActionNameOnMockTransportService(mockTransportService, listOfActionsToBlock); + } + } + + // We do not expect to see a WARN log about a node disconnecting (#ES-11449) + addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog); + + // We haven't changed master nodes yet + assertEquals(originalMasterNodeName, internalCluster().getMasterName()); + + // Sends a node join request to the original master node. This will fail, and cause a master failover + // startDataOnlyNode waits for the new node to be added, and this can only occur after a re-election + String newNodeName = internalCluster().startDataOnlyNode(); + assertNotEquals(originalMasterNodeName, internalCluster().getMasterName()); + logger.info("New master is elected"); + + // Assert all nodes have accepted N into their cluster state + assertNewNodeIsInAllClusterStates(newNodeName); + + mockLog.assertAllExpectationsMatched(); + + // Assert the new data node was added + DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes(); + assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize()); + assertTrue(getListOfDataNodeNamesFromCluster(newMasterNodeName).contains(newNodeName)); + } + } + + /* + In this scenario, node N attempts to join a cluster, there is an election and the original master is re-elected. + Node N should join the cluster, but it should not be disconnected (#ES-11449) + */ + @TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination:INFO") + public void testNodeTriesToJoinClusterAndThenSameMasterIsElected() { + internalCluster().startNodes(3); + ensureStableCluster(3); + String masterNodeName = internalCluster().getMasterName(); + + long originalTerm = getTerm(masterNodeName); + int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size(); + + try (var mockLog = MockLog.capture(NodeJoinExecutor.class, MasterService.class, ClusterApplierService.class)) { + for (String nodeName : internalCluster().getNodeNames()) { + final var mockTransportService = MockTransportService.getInstance(nodeName); + + if (nodeName.equals(masterNodeName)) { + // This makes the master fail, forcing a re-election + blockActionNameOnMockTransportService( + mockTransportService, + List.of(PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME) + ); + + // Wait until the master has stepped down before removing the publishing ban + // This allows the master to be re-elected + ClusterServiceUtils.addTemporaryStateListener(internalCluster().clusterService(masterNodeName), clusterState -> { + DiscoveryNode currentMasterNode = clusterState.nodes().getMasterNode(); + boolean hasMasterSteppedDown = currentMasterNode == null + || currentMasterNode.getName().equals(masterNodeName) == false; + if (hasMasterSteppedDown) { + logger.info("Master publishing ban removed"); + mockTransportService.addSendBehavior(Transport.Connection::sendRequest); + } + return hasMasterSteppedDown; + }); + + } else { + // This disables pre-voting on all nodes except the master, forcing it to win the election + blockActionNameOnMockTransportService( + mockTransportService, + List.of(StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME) + ); + } + } + + // We expect the node join request to fail with a FailedToCommitClusterStateException + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "failed to commit cluster state", + MasterService.class.getCanonicalName(), + Level.WARN, + "failed to commit cluster state" + ) + ); + + /* + We expect the cluster to reuse the connection to N and not disconnect it + Therefore, this WARN log should not be thrown (#ES-11449) + */ + addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog); + + // Before we add the new node, assert we haven't changed master nodes yet + assertEquals(masterNodeName, internalCluster().getMasterName()); + + // Sends a node join request to the original master node. This will fail, and cause a master failover + logger.info("Sending node join request"); + String newNodeName = internalCluster().startDataOnlyNode(); + + // Assert the master was re-elected + assertEquals(masterNodeName, internalCluster().getMasterName()); + assertTrue(originalTerm < getTerm(masterNodeName)); + + // Assert all nodes have accepted N into their cluster state + assertNewNodeIsInAllClusterStates(newNodeName); + + // If the WARN log was thrown, then the connection to N was disconnected so fail the test + mockLog.assertAllExpectationsMatched(); + + // Assert the new data node was added + DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes(); + assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize()); + assertTrue(getListOfDataNodeNamesFromCluster(masterNodeName).contains(newNodeName)); + } + } + + private long getTerm(String masterNodeName) { + return internalCluster().clusterService(masterNodeName).state().coordinationMetadata().term(); + } + + private void assertNewNodeIsInAllClusterStates(String newNodeName) { + for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) { + assertTrue(clusterService.state().nodes().getAllNodes().stream().map(DiscoveryNode::getName).toList().contains(newNodeName)); + } + } + + private List getListOfDataNodeNamesFromCluster(String nodeName) { + return internalCluster().clusterService(nodeName) + .state() + .getNodes() + .getDataNodes() + .values() + .stream() + .map(DiscoveryNode::getName) + .toList(); + } + + private void addJoiningNodeDisconnectedWarnLogFalseExpectation(MockLog mockLog) { + mockLog.addExpectation( + new MockLog.UnseenEventExpectation( + "warn message with troubleshooting link", + "org.elasticsearch.cluster.coordination.NodeJoinExecutor", + Level.WARN, + "*" + ) + ); + } + + private void blockActionNameOnMockTransportService(MockTransportService mockTransportService, List actionNamesToBlock) { + mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (actionNamesToBlock.contains(action)) { + throw new ElasticsearchException("[{}] for [{}] denied", action, connection.getNode()); + } else { + connection.sendRequest(requestId, action, request, options); + } + }); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 9ea65ee20dd1f..747cb4f2deed5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -19,8 +19,10 @@ import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStatePublicationEvent; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.LocalMasterServiceTask; @@ -40,6 +42,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.ClusterApplierService; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.cluster.version.CompatibilityVersions; @@ -191,6 +194,7 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt private final NodeHealthService nodeHealthService; private final List peerFinderListeners; private final LeaderHeartbeatService leaderHeartbeatService; + private final ClusterService clusterService; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -219,7 +223,8 @@ public Coordinator( LeaderHeartbeatService leaderHeartbeatService, PreVoteCollector.Factory preVoteCollectorFactory, CompatibilityVersions compatibilityVersions, - FeatureService featureService + FeatureService featureService, + ClusterService clusterService ) { this.settings = settings; this.transportService = transportService; @@ -329,6 +334,7 @@ public Coordinator( this.peerFinderListeners.add(clusterBootstrapService); this.leaderHeartbeatService = leaderHeartbeatService; this.compatibilityVersions = compatibilityVersions; + this.clusterService = clusterService; } /** @@ -663,11 +669,57 @@ private void handleJoinRequest(JoinRequest joinRequest, ActionListener joi transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() { @Override public void onResponse(Releasable response) { - validateJoinRequest( - joinRequest, - ActionListener.runBefore(joinListener, () -> Releasables.close(response)) - .delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l)) - ); + SubscribableListener + // Validates the join request: can the remote node deserialize our cluster state and does it respond to pings? + .newForked(l -> validateJoinRequest(joinRequest, l)) + + // Adds the joining node to the cluster state + .andThen(l -> processJoinRequest(joinRequest, l.delegateResponse((ll, e) -> { + // #ES-11449 + if (e instanceof FailedToCommitClusterStateException) { + // The commit failed (i.e. master is failing over) but this does not imply that the join has actually failed: + // the next master may have already accepted the state that we just published and will therefore include the + // joining node in its future states too. Thus, we need to wait for the next committed state before we know the + // eventual outcome, and we need to wait for that before we can release (our ref to) the connection and complete + // the listener. + + // NB we are on the master update thread here at the end of processing the failed cluster state update, so this + // all happens before any cluster state update that re-elects a master + assert ThreadPool.assertCurrentThreadPool(MasterService.MASTER_UPDATE_THREAD_NAME); + + final ClusterStateListener clusterStateListener = new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + final var discoveryNodes = event.state().nodes(); + // Keep the connection open until the next committed state + if (discoveryNodes.getMasterNode() != null) { + // Remove this listener to avoid memory leaks + clusterService.removeListener(this); + if (discoveryNodes.nodeExists(joinRequest.getSourceNode().getId())) { + ll.onResponse(null); + } else { + ll.onFailure(e); + } + } + } + }; + clusterService.addListener(clusterStateListener); + clusterStateListener.clusterChanged( + new ClusterChangedEvent( + "Checking if another master has been elected since " + + joinRequest.getSourceNode().getName() + + " attempted to join cluster", + clusterService.state(), + clusterService.state() + ) + ); + } else { + ll.onFailure(e); + } + }))) + + // Whatever the outcome, release (our ref to) the connection we just opened and notify the joining node. + .addListener(ActionListener.runBefore(joinListener, () -> Releasables.close(response))); } @Override diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index a60044076234e..7277a5caaf229 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.version.CompatibilityVersions; import org.elasticsearch.common.Randomness; @@ -107,7 +108,8 @@ public DiscoveryModule( NodeHealthService nodeHealthService, CircuitBreakerService circuitBreakerService, CompatibilityVersions compatibilityVersions, - FeatureService featureService + FeatureService featureService, + ClusterService clusterService ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -194,7 +196,8 @@ public DiscoveryModule( leaderHeartbeatService, preVoteCollectorFactory, compatibilityVersions, - featureService + featureService, + clusterService ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 201bf648439c5..0fec7f87be6bf 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -1742,7 +1742,8 @@ private DiscoveryModule createDiscoveryModule( fsHealthService, circuitBreakerService, compatibilityVersions, - featureService + featureService, + clusterService ); modules.add(module, b -> { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index 4ff247ff835fe..377b91c7320a8 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.FakeThreadPoolMasterService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.service.MasterServiceTests; @@ -212,8 +213,16 @@ protected void onSendRequest(long requestId, String action, TransportRequest req clusterSettings, Collections.emptySet() ); + String nodeName = "test_node"; + Settings settings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build(); + ClusterService clusterService = new ClusterService( + settings, + clusterSettings, + threadPool, + new TaskManager(settings, threadPool, Set.of()) + ); coordinator = new Coordinator( - "test_node", + nodeName, Settings.EMPTY, clusterSettings, transportService, @@ -234,7 +243,8 @@ protected void onSendRequest(long requestId, String action, TransportRequest req LeaderHeartbeatService.NO_OP, StatefulPreVoteCollector::new, CompatibilityVersionsUtils.staticCurrent(), - new FeatureService(List.of()) + new FeatureService(List.of()), + clusterService ); transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java b/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java index 94f8700c922e8..ce27b2499a190 100644 --- a/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.node.VersionInformation; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.cluster.version.CompatibilityVersionsUtils; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -61,6 +62,7 @@ public class DiscoveryModuleTests extends ESTestCase { private ClusterApplier clusterApplier; private ClusterSettings clusterSettings; private GatewayMetaState gatewayMetaState; + private ClusterService clusterService; public interface DummyHostsProviderPlugin extends DiscoveryPlugin { Map> impl(); @@ -88,6 +90,7 @@ public void setupDummyServices() { clusterApplier = mock(ClusterApplier.class); clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); gatewayMetaState = mock(GatewayMetaState.class); + clusterService = mock(ClusterService.class); } @After @@ -118,7 +121,8 @@ private DiscoveryModule newModule( null, new NoneCircuitBreakerService(), CompatibilityVersionsUtils.staticCurrent(), - new FeatureService(List.of()) + new FeatureService(List.of()), + clusterService ); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index d6a57ba4587c5..929a6e513eb9b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -2984,7 +2984,8 @@ public void start(ClusterState initialState) { LeaderHeartbeatService.NO_OP, StatefulPreVoteCollector::new, CompatibilityVersionsUtils.staticCurrent(), - new FeatureService(List.of()) + new FeatureService(List.of()), + this.clusterService ); masterService.setClusterStatePublisher(coordinator); coordinator.start(); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index b8f225aedc0de..3e431eab4a15f 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -1171,7 +1171,8 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { coordinationServices.getLeaderHeartbeatService(), coordinationServices.getPreVoteCollectorFactory(), CompatibilityVersionsUtils.staticCurrent(), - new FeatureService(List.of()) + new FeatureService(List.of()), + clusterService ); coordinationDiagnosticsService = new CoordinationDiagnosticsService( clusterService,