Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.Level;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;

import java.util.Collection;
import java.util.List;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class NodeJoiningIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
// detect leader failover quickly
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.build();
}

public void testNodeJoinsCluster() {
internalCluster().startNodes(3);
String masterNodeName = internalCluster().getMasterName();
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size();
int numberOfMasterNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName)
.state()
.nodes()
.getMasterNodes()
.size();
List<String> namesOfDataNodesInOriginalCluster = getListOfDataNodeNamesFromCluster(masterNodeName);

// Attempt to add new node
String newNodeName = internalCluster().startDataOnlyNode();
ensureStableCluster(4);

// Assert the new data node was added
ClusterState state = internalCluster().clusterService(masterNodeName).state();
assertEquals(numberOfNodesOriginallyInCluster + 1, state.nodes().getSize());
assertEquals(namesOfDataNodesInOriginalCluster.size() + 1, state.nodes().getDataNodes().size());
assertEquals(numberOfMasterNodesOriginallyInCluster, state.nodes().getMasterNodes().size());

List<String> namesOfDataNodesInNewCluster = getListOfDataNodeNamesFromCluster(masterNodeName);
assertTrue(namesOfDataNodesInNewCluster.contains(newNodeName));
for (String nodeName : namesOfDataNodesInOriginalCluster) {
assertTrue(namesOfDataNodesInNewCluster.contains(nodeName));
}
}

@TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination.NodeJoinExecutor:INFO")
public void testNodeTriesToJoinClusterAndThenDifferentMasterIsElected() {
List<String> nodeNames = internalCluster().startNodes(3);
ensureStableCluster(3);
String originalMasterNodeName = internalCluster().getMasterName();
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(originalMasterNodeName).state().getNodes().size();
// Determine upfront who we want the next master to be
final var newMasterNodeName = randomValueOtherThan(originalMasterNodeName, () -> randomFrom(nodeNames));

// Ensure the logging is as expected
try (var mockLog = MockLog.capture(NodeJoinExecutor.class)) {

// Sets MockTransportService behaviour
for (final var transportService : internalCluster().getInstances(TransportService.class)) {
final var mockTransportService = asInstanceOf(MockTransportService.class, transportService);

if (mockTransportService.getLocalNode().getName().equals(newMasterNodeName) == false) {
List<String> listOfActionsToBlock = List.of(
// This forces the current master node to fail
PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME,
// This disables pre-voting on all nodes except the new master, forcing it to win the election
StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME
);
blockActionNameOnMockTransportService(mockTransportService, listOfActionsToBlock);
}
}

// We do not expect to see a WARN log about a node disconnecting (#ES-11449)
addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog);

// We haven't changed master nodes yet
assertEquals(originalMasterNodeName, internalCluster().getMasterName());

// Sends a node join request to the original master node. This will fail, and cause a master failover
// startDataOnlyNode waits for the new node to be added, and this can only occur after a re-election
String newNodeName = internalCluster().startDataOnlyNode();
assertNotEquals(originalMasterNodeName, internalCluster().getMasterName());
logger.info("New master is elected");

// Assert all nodes have accepted N into their cluster state
assertNewNodeIsInAllClusterStates(newNodeName);

mockLog.assertAllExpectationsMatched();

// Assert the new data node was added
DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes();
assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize());
assertTrue(getListOfDataNodeNamesFromCluster(newMasterNodeName).contains(newNodeName));
}
}

/*
In this scenario, node N attempts to join a cluster, there is an election and the original master is re-elected.
Node N should join the cluster, but it should not be disconnected (#ES-11449)
*/
@TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination:INFO")
public void testNodeTriesToJoinClusterAndThenSameMasterIsElected() {
internalCluster().startNodes(3);
ensureStableCluster(3);
String masterNodeName = internalCluster().getMasterName();

long originalTerm = getTerm(masterNodeName);
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size();

try (var mockLog = MockLog.capture(NodeJoinExecutor.class, MasterService.class, ClusterApplierService.class)) {
for (String nodeName : internalCluster().getNodeNames()) {
final var mockTransportService = MockTransportService.getInstance(nodeName);

if (nodeName.equals(masterNodeName)) {
// This makes the master fail, forcing a re-election
blockActionNameOnMockTransportService(
mockTransportService,
List.of(PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME)
);

// Wait until the master has stepped down before removing the publishing ban
// This allows the master to be re-elected
ClusterServiceUtils.addTemporaryStateListener(internalCluster().clusterService(masterNodeName), clusterState -> {
DiscoveryNode currentMasterNode = clusterState.nodes().getMasterNode();
boolean hasMasterSteppedDown = currentMasterNode == null
|| currentMasterNode.getName().equals(masterNodeName) == false;
if (hasMasterSteppedDown) {
logger.info("Master publishing ban removed");
mockTransportService.addSendBehavior(Transport.Connection::sendRequest);
}
return hasMasterSteppedDown;
});

} else {
// This disables pre-voting on all nodes except the master, forcing it to win the election
blockActionNameOnMockTransportService(
mockTransportService,
List.of(StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME)
);
}
}

// We expect the node join request to fail with a FailedToCommitClusterStateException
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"failed to commit cluster state",
MasterService.class.getCanonicalName(),
Level.WARN,
"failed to commit cluster state"
)
);

/*
We expect the cluster to reuse the connection to N and not disconnect it
Therefore, this WARN log should not be thrown (#ES-11449)
*/
addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog);

// Before we add the new node, assert we haven't changed master nodes yet
assertEquals(masterNodeName, internalCluster().getMasterName());

// Sends a node join request to the original master node. This will fail, and cause a master failover
logger.info("Sending node join request");
String newNodeName = internalCluster().startDataOnlyNode();

// Assert the master was re-elected
assertEquals(masterNodeName, internalCluster().getMasterName());
assertTrue(originalTerm < getTerm(masterNodeName));

// Assert all nodes have accepted N into their cluster state
assertNewNodeIsInAllClusterStates(newNodeName);

// If the WARN log was thrown, then the connection to N was disconnected so fail the test
mockLog.assertAllExpectationsMatched();

// Assert the new data node was added
DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes();
assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize());
assertTrue(getListOfDataNodeNamesFromCluster(masterNodeName).contains(newNodeName));
}
}

private long getTerm(String masterNodeName) {
return internalCluster().clusterService(masterNodeName).state().coordinationMetadata().term();
}

private void assertNewNodeIsInAllClusterStates(String newNodeName) {
for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) {
assertTrue(clusterService.state().nodes().getAllNodes().stream().map(DiscoveryNode::getName).toList().contains(newNodeName));
}
}

private List<String> getListOfDataNodeNamesFromCluster(String nodeName) {
return internalCluster().clusterService(nodeName)
.state()
.getNodes()
.getDataNodes()
.values()
.stream()
.map(DiscoveryNode::getName)
.toList();
}

private void addJoiningNodeDisconnectedWarnLogFalseExpectation(MockLog mockLog) {
mockLog.addExpectation(
new MockLog.UnseenEventExpectation(
"warn message with troubleshooting link",
"org.elasticsearch.cluster.coordination.NodeJoinExecutor",
Level.WARN,
"*"
)
);
}

private void blockActionNameOnMockTransportService(MockTransportService mockTransportService, List<String> actionNamesToBlock) {
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (actionNamesToBlock.contains(action)) {
throw new ElasticsearchException("[{}] for [{}] denied", action, connection.getNode());
} else {
connection.sendRequest(requestId, action, request, options);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStatePublicationEvent;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalMasterServiceTask;
Expand All @@ -39,6 +41,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.cluster.version.CompatibilityVersions;
Expand Down Expand Up @@ -190,6 +193,7 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
private final NodeHealthService nodeHealthService;
private final List<PeerFinderListener> peerFinderListeners;
private final LeaderHeartbeatService leaderHeartbeatService;
private final ClusterService clusterService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand Down Expand Up @@ -218,7 +222,8 @@ public Coordinator(
LeaderHeartbeatService leaderHeartbeatService,
PreVoteCollector.Factory preVoteCollectorFactory,
CompatibilityVersions compatibilityVersions,
FeatureService featureService
FeatureService featureService,
ClusterService clusterService
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -328,6 +333,7 @@ public Coordinator(
this.peerFinderListeners.add(clusterBootstrapService);
this.leaderHeartbeatService = leaderHeartbeatService;
this.compatibilityVersions = compatibilityVersions;
this.clusterService = clusterService;
}

/**
Expand Down Expand Up @@ -662,11 +668,57 @@ private void handleJoinRequest(JoinRequest joinRequest, ActionListener<Void> joi
transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() {
@Override
public void onResponse(Releasable response) {
validateJoinRequest(
joinRequest,
ActionListener.runBefore(joinListener, () -> Releasables.close(response))
.delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l))
);
SubscribableListener
// Validates the join request: can the remote node deserialize our cluster state and does it respond to pings?
.<Void>newForked(l -> validateJoinRequest(joinRequest, l))

// Adds the joining node to the cluster state
.<Void>andThen(l -> processJoinRequest(joinRequest, l.delegateResponse((ll, e) -> {
// #ES-11449
if (e instanceof FailedToCommitClusterStateException) {
// The commit failed (i.e. master is failing over) but this does not imply that the join has actually failed:
// the next master may have already accepted the state that we just published and will therefore include the
// joining node in its future states too. Thus we need to wait for the next committed state before we know the
// eventual outcome, and we need to wait for that before we can release (our ref to) the connection and complete
// the listener.

// NB we are on the master update thread here at the end of processing the failed cluster state update, so this
// all happens before any cluster state update that re-elects a master
// assert ThreadPool.assertCurrentThreadPool(MasterService.MASTER_UPDATE_THREAD_NAME);

Comment on lines 686 to 689
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is currently a lie, and we definitely shouldn't have any commented-out code like this. At the very least we should delete these lines.

But the issue here is that this indicates that we're misusing FailedToCommitClusterStateException and throwing it from places where the cluster state update has not even been published, which means we can be certain its effects won't appear in some future state. In those cases we don't need to delay the completion of the join listener. It's kind of ok, but would you investigate whether we can reasonably fix these to use a different exception (e.g. NotMasterException) and tighten up the meaning of FailedToCommitClusterStateException so that it can only happen if we're unsure of the outcome?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you investigate whether we can reasonably fix these

If so, that should be a separate PR which lands ahead of this one.

Copy link
Contributor Author

@joshua-adams-1 joshua-adams-1 Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least we should delete these lines.

Apologies! I clearly commented out the code to run the tests and then forgot to delete them (insert face palm here)

investigate whether we can reasonably fix these to use a different exception

To confirm, this is to investigate where we're throwing the FailedToCommitClusterStateException and ensuring they're only thrown from appropriate places? I can do that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensuring they're only thrown from appropriate places

Yep

final ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
final var discoveryNodes = event.state().nodes();
// Keep the connection open until the next committed state
if (discoveryNodes.getMasterNode() != null) {
// Remove this listener to avoid memory leaks
clusterService.removeListener(this);
if (discoveryNodes.nodeExists(joinRequest.getSourceNode().getId())) {
ll.onResponse(null);
} else {
ll.onFailure(e);
}
}
}
};
clusterService.addListener(clusterStateListener);
clusterStateListener.clusterChanged(
new ClusterChangedEvent(
"Checking if another master has been elected since "
+ joinRequest.getSourceNode().getName()
+ " attempted to join cluster",
clusterService.state(),
clusterService.state()
)
);
} else {
ll.onFailure(e);
}
})))

// Whatever the outcome, release (our ref to) the connection we just opened and notify the joining node.
.addListener(ActionListener.runBefore(joinListener, () -> Releasables.close(response)));
}

@Override
Expand Down
Loading