-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Master node disconnect #132023
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Master node disconnect #132023
Changes from 60 commits
21d7b30
829c69f
fb05a2a
962b3b2
4b97f07
2a541b0
eb572d4
3e501b4
b6ddfba
dac102a
d269e47
a668ab1
17e6bd6
cc67155
d47915b
f7cbcbd
c989fd0
c1b1ef2
6708bb4
4150643
170a518
2ee0871
5eec40f
c27fe6a
15d6693
6c3fadc
f45e1ad
b238367
36c9f02
ba8c5e6
4a8f3f7
cdfdb5e
a4ac7fb
cbbacc0
c0a079f
3337cf2
57e5887
25333ca
433b827
9203679
e6b86ef
a180eaf
57db61c
af0c58e
b7922ff
6485e97
1d35bd3
332a86c
98be4ad
0e227f7
00b27f3
8c4de89
d5141bc
ed73225
a7b2f0e
ad45d19
0b4560f
aa82afe
8323493
ff5b4ec
8ad70a2
8316b56
8bf17e5
3f46e13
75b118a
c9f608f
2f493f5
dc7eb96
3683676
cd8502a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,257 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.cluster.coordination; | ||
|
||
import org.apache.logging.log4j.Level; | ||
import org.elasticsearch.ElasticsearchException; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.node.DiscoveryNodes; | ||
import org.elasticsearch.cluster.service.ClusterApplierService; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.cluster.service.MasterService; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.util.CollectionUtils; | ||
import org.elasticsearch.plugins.Plugin; | ||
import org.elasticsearch.test.ClusterServiceUtils; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
import org.elasticsearch.test.MockLog; | ||
import org.elasticsearch.test.junit.annotations.TestLogging; | ||
import org.elasticsearch.test.transport.MockTransportService; | ||
import org.elasticsearch.transport.Transport; | ||
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.util.Collection; | ||
import java.util.List; | ||
|
||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) | ||
public class NodeJoiningIT extends ESIntegTestCase { | ||
|
||
@Override | ||
protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class); | ||
} | ||
|
||
@Override | ||
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { | ||
return Settings.builder() | ||
.put(super.nodeSettings(nodeOrdinal, otherSettings)) | ||
// detect leader failover quickly | ||
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) | ||
.put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "100ms") | ||
.build(); | ||
} | ||
|
||
public void testNodeJoinsCluster() { | ||
internalCluster().startNodes(3); | ||
String masterNodeName = internalCluster().getMasterName(); | ||
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size(); | ||
int numberOfMasterNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName) | ||
.state() | ||
.nodes() | ||
.getMasterNodes() | ||
.size(); | ||
List<String> namesOfDataNodesInOriginalCluster = getListOfDataNodeNamesFromCluster(masterNodeName); | ||
|
||
// Attempt to add new node | ||
String newNodeName = internalCluster().startDataOnlyNode(); | ||
ensureStableCluster(4); | ||
|
||
// Assert the new data node was added | ||
ClusterState state = internalCluster().clusterService(masterNodeName).state(); | ||
assertEquals(numberOfNodesOriginallyInCluster + 1, state.nodes().getSize()); | ||
assertEquals(namesOfDataNodesInOriginalCluster.size() + 1, state.nodes().getDataNodes().size()); | ||
assertEquals(numberOfMasterNodesOriginallyInCluster, state.nodes().getMasterNodes().size()); | ||
|
||
List<String> namesOfDataNodesInNewCluster = getListOfDataNodeNamesFromCluster(masterNodeName); | ||
assertTrue(namesOfDataNodesInNewCluster.contains(newNodeName)); | ||
for (String nodeName : namesOfDataNodesInOriginalCluster) { | ||
assertTrue(namesOfDataNodesInNewCluster.contains(nodeName)); | ||
} | ||
} | ||
|
||
@TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination.NodeJoinExecutor:INFO") | ||
public void testNodeTriesToJoinClusterAndThenDifferentMasterIsElected() { | ||
List<String> nodeNames = internalCluster().startNodes(3); | ||
ensureStableCluster(3); | ||
String originalMasterNodeName = internalCluster().getMasterName(); | ||
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(originalMasterNodeName).state().getNodes().size(); | ||
// Determine upfront who we want the next master to be | ||
final var newMasterNodeName = randomValueOtherThan(originalMasterNodeName, () -> randomFrom(nodeNames)); | ||
|
||
// Ensure the logging is as expected | ||
try (var mockLog = MockLog.capture(NodeJoinExecutor.class)) { | ||
|
||
// Sets MockTransportService behaviour | ||
for (final var transportService : internalCluster().getInstances(TransportService.class)) { | ||
final var mockTransportService = asInstanceOf(MockTransportService.class, transportService); | ||
|
||
if (mockTransportService.getLocalNode().getName().equals(newMasterNodeName) == false) { | ||
List<String> listOfActionsToBlock = List.of( | ||
// This forces the current master node to fail | ||
PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME, | ||
// This disables pre-voting on all nodes except the new master, forcing it to win the election | ||
StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME | ||
); | ||
blockActionNameOnMockTransportService(mockTransportService, listOfActionsToBlock); | ||
} | ||
} | ||
|
||
// We do not expect to see a WARN log about a node disconnecting (#ES-11449) | ||
addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog); | ||
|
||
// We haven't changed master nodes yet | ||
assertEquals(originalMasterNodeName, internalCluster().getMasterName()); | ||
|
||
// Sends a node join request to the original master node. This will fail, and cause a master failover | ||
// startDataOnlyNode waits for the new node to be added, and this can only occur after a re-election | ||
String newNodeName = internalCluster().startDataOnlyNode(); | ||
assertNotEquals(originalMasterNodeName, internalCluster().getMasterName()); | ||
logger.info("New master is elected"); | ||
|
||
// Assert all nodes have accepted N into their cluster state | ||
assertNewNodeIsInAllClusterStates(newNodeName); | ||
|
||
mockLog.assertAllExpectationsMatched(); | ||
|
||
// Assert the new data node was added | ||
DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes(); | ||
assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize()); | ||
assertTrue(getListOfDataNodeNamesFromCluster(newMasterNodeName).contains(newNodeName)); | ||
} | ||
} | ||
|
||
/* | ||
In this scenario, node N attempts to join a cluster, there is an election and the original master is re-elected. | ||
Node N should join the cluster, but it should not be disconnected (#ES-11449) | ||
*/ | ||
@TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination:INFO") | ||
public void testNodeTriesToJoinClusterAndThenSameMasterIsElected() { | ||
internalCluster().startNodes(3); | ||
ensureStableCluster(3); | ||
String masterNodeName = internalCluster().getMasterName(); | ||
|
||
long originalTerm = getTerm(masterNodeName); | ||
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size(); | ||
|
||
try (var mockLog = MockLog.capture(NodeJoinExecutor.class, MasterService.class, ClusterApplierService.class)) { | ||
for (String nodeName : internalCluster().getNodeNames()) { | ||
final var mockTransportService = MockTransportService.getInstance(nodeName); | ||
|
||
if (nodeName.equals(masterNodeName)) { | ||
// This makes the master fail, forcing a re-election | ||
blockActionNameOnMockTransportService( | ||
mockTransportService, | ||
List.of(PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME) | ||
); | ||
|
||
// Wait until the master has stepped down before removing the publishing ban | ||
// This allows the master to be re-elected | ||
ClusterServiceUtils.addTemporaryStateListener(internalCluster().clusterService(masterNodeName), clusterState -> { | ||
DiscoveryNode currentMasterNode = clusterState.nodes().getMasterNode(); | ||
boolean hasMasterSteppedDown = currentMasterNode == null | ||
|| currentMasterNode.getName().equals(masterNodeName) == false; | ||
if (hasMasterSteppedDown) { | ||
logger.info("Master publishing ban removed"); | ||
mockTransportService.addSendBehavior(Transport.Connection::sendRequest); | ||
} | ||
return hasMasterSteppedDown; | ||
}); | ||
|
||
} else { | ||
// This disables pre-voting on all nodes except the master, forcing it to win the election | ||
blockActionNameOnMockTransportService( | ||
mockTransportService, | ||
List.of(StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME) | ||
); | ||
} | ||
} | ||
|
||
// We expect the node join request to fail with a FailedToCommitClusterStateException | ||
mockLog.addExpectation( | ||
new MockLog.SeenEventExpectation( | ||
"failed to commit cluster state", | ||
MasterService.class.getCanonicalName(), | ||
Level.WARN, | ||
"failed to commit cluster state" | ||
) | ||
); | ||
|
||
/* | ||
We expect the cluster to reuse the connection to N and not disconnect it | ||
Therefore, this WARN log should not be thrown (#ES-11449) | ||
*/ | ||
addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog); | ||
|
||
// Before we add the new node, assert we haven't changed master nodes yet | ||
assertEquals(masterNodeName, internalCluster().getMasterName()); | ||
|
||
// Sends a node join request to the original master node. This will fail, and cause a master failover | ||
logger.info("Sending node join request"); | ||
String newNodeName = internalCluster().startDataOnlyNode(); | ||
|
||
// Assert the master was re-elected | ||
assertTrue(masterNodeName.equals(internalCluster().getMasterName()) && originalTerm < getTerm(masterNodeName)); | ||
|
||
|
||
// Assert all nodes have accepted N into their cluster state | ||
assertNewNodeIsInAllClusterStates(newNodeName); | ||
|
||
// If the WARN log was thrown, then the connection to N was disconnected so fail the test | ||
mockLog.assertAllExpectationsMatched(); | ||
|
||
// Assert the new data node was added | ||
DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes(); | ||
assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize()); | ||
assertTrue(getListOfDataNodeNamesFromCluster(masterNodeName).contains(newNodeName)); | ||
} | ||
} | ||
|
||
private long getTerm(String masterNodeName) { | ||
return internalCluster().clusterService(masterNodeName).state().coordinationMetadata().term(); | ||
} | ||
|
||
private void assertNewNodeIsInAllClusterStates(String newNodeName) { | ||
for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) { | ||
assertTrue(clusterService.state().nodes().getAllNodes().stream().map(DiscoveryNode::getName).toList().contains(newNodeName)); | ||
} | ||
} | ||
|
||
private List<String> getListOfDataNodeNamesFromCluster(String nodeName) { | ||
return internalCluster().clusterService(nodeName) | ||
.state() | ||
.getNodes() | ||
.getDataNodes() | ||
.values() | ||
.stream() | ||
.map(DiscoveryNode::getName) | ||
.toList(); | ||
} | ||
|
||
private void addJoiningNodeDisconnectedWarnLogFalseExpectation(MockLog mockLog) { | ||
mockLog.addExpectation( | ||
new MockLog.UnseenEventExpectation( | ||
"warn message with troubleshooting link", | ||
"org.elasticsearch.cluster.coordination.NodeJoinExecutor", | ||
Level.WARN, | ||
"*" | ||
) | ||
); | ||
} | ||
|
||
private void blockActionNameOnMockTransportService(MockTransportService mockTransportService, List<String> actionNamesToBlock) { | ||
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> { | ||
if (actionNamesToBlock.contains(action)) { | ||
throw new ElasticsearchException("[{}] for [{}] denied", action, connection.getNode()); | ||
} else { | ||
connection.sendRequest(requestId, action, request, options); | ||
} | ||
}); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -19,8 +19,10 @@ | |||||
import org.elasticsearch.action.support.SubscribableListener; | ||||||
import org.elasticsearch.action.support.ThreadedActionListener; | ||||||
import org.elasticsearch.client.internal.Client; | ||||||
import org.elasticsearch.cluster.ClusterChangedEvent; | ||||||
import org.elasticsearch.cluster.ClusterName; | ||||||
import org.elasticsearch.cluster.ClusterState; | ||||||
import org.elasticsearch.cluster.ClusterStateListener; | ||||||
import org.elasticsearch.cluster.ClusterStatePublicationEvent; | ||||||
import org.elasticsearch.cluster.ClusterStateUpdateTask; | ||||||
import org.elasticsearch.cluster.LocalMasterServiceTask; | ||||||
|
@@ -39,6 +41,7 @@ | |||||
import org.elasticsearch.cluster.routing.allocation.AllocationService; | ||||||
import org.elasticsearch.cluster.service.ClusterApplier; | ||||||
import org.elasticsearch.cluster.service.ClusterApplierService; | ||||||
import org.elasticsearch.cluster.service.ClusterService; | ||||||
import org.elasticsearch.cluster.service.MasterService; | ||||||
import org.elasticsearch.cluster.service.MasterServiceTaskQueue; | ||||||
import org.elasticsearch.cluster.version.CompatibilityVersions; | ||||||
|
@@ -190,6 +193,7 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt | |||||
private final NodeHealthService nodeHealthService; | ||||||
private final List<PeerFinderListener> peerFinderListeners; | ||||||
private final LeaderHeartbeatService leaderHeartbeatService; | ||||||
private final ClusterService clusterService; | ||||||
|
||||||
/** | ||||||
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. | ||||||
|
@@ -218,7 +222,8 @@ public Coordinator( | |||||
LeaderHeartbeatService leaderHeartbeatService, | ||||||
PreVoteCollector.Factory preVoteCollectorFactory, | ||||||
CompatibilityVersions compatibilityVersions, | ||||||
FeatureService featureService | ||||||
FeatureService featureService, | ||||||
ClusterService clusterService | ||||||
) { | ||||||
this.settings = settings; | ||||||
this.transportService = transportService; | ||||||
|
@@ -328,6 +333,7 @@ public Coordinator( | |||||
this.peerFinderListeners.add(clusterBootstrapService); | ||||||
this.leaderHeartbeatService = leaderHeartbeatService; | ||||||
this.compatibilityVersions = compatibilityVersions; | ||||||
this.clusterService = clusterService; | ||||||
} | ||||||
|
||||||
/** | ||||||
|
@@ -662,11 +668,57 @@ private void handleJoinRequest(JoinRequest joinRequest, ActionListener<Void> joi | |||||
transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() { | ||||||
@Override | ||||||
public void onResponse(Releasable response) { | ||||||
validateJoinRequest( | ||||||
joinRequest, | ||||||
ActionListener.runBefore(joinListener, () -> Releasables.close(response)) | ||||||
.delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l)) | ||||||
); | ||||||
SubscribableListener | ||||||
// Validates the join request: can the remote node deserialize our cluster state and does it respond to pings? | ||||||
.<Void>newForked(l -> validateJoinRequest(joinRequest, l)) | ||||||
|
||||||
// Adds the joining node to the cluster state | ||||||
.<Void>andThen(l -> processJoinRequest(joinRequest, l.delegateResponse((ll, e) -> { | ||||||
// #ES-11449 | ||||||
if (e instanceof FailedToCommitClusterStateException) { | ||||||
DaveCTurner marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
// The commit failed (i.e. master is failing over) but this does not imply that the join has actually failed: | ||||||
// the next master may have already accepted the state that we just published and will therefore include the | ||||||
// joining node in its future states too. Thus we need to wait for the next committed state before we know the | ||||||
// eventual outcome, and we need to wait for that before we can release (our ref to) the connection and complete | ||||||
// the listener. | ||||||
|
||||||
// NB we are on the master update thread here at the end of processing the failed cluster state update, so this | ||||||
// all happens before any cluster state update that re-elects a master | ||||||
assert ThreadPool.assertCurrentThreadPool(MasterService.MASTER_UPDATE_THREAD_NAME); | ||||||
|
||||||
final ClusterStateListener clusterStateListener = new ClusterStateListener() { | ||||||
@Override | ||||||
public void clusterChanged(ClusterChangedEvent event) { | ||||||
final var discoveryNodes = event.state().nodes(); | ||||||
// Keep the connection open until the next committed state | ||||||
if (discoveryNodes.getMasterNode() != null) { | ||||||
// Remove this listener to avoid memory leaks | ||||||
clusterService.removeListener(this); | ||||||
|
||||||
if (discoveryNodes.nodeExists(joinRequest.getSourceNode().getId())) { | ||||||
ll.onResponse(null); | ||||||
} else { | ||||||
ll.onFailure(e); | ||||||
} | ||||||
} | ||||||
} | ||||||
}; | ||||||
clusterService.addListener(clusterStateListener); | ||||||
|
||||||
// Immediate condition check in case another node is elected master | ||||||
if (clusterService.state().nodes().nodeExists(joinRequest.getSourceNode().getId())) { | ||||||
|
* If this listener is completed more than once then all results other than the first (whether successful or otherwise) are silently | |
* discarded. All subscribed listeners will be notified of the same result, exactly once, even if several completions occur concurrently. |
However we can't rely on that being true in future, it's not guaranteed that ll
will always be a SubscribableListener
in this context. I'd prefer we explicitly deduplicated this work e.g. by creating another SubscribableListener
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However we can't rely on that being true in future
I contemplated making it not be true by adding a check that these listeners are not completed multiple times:
diff --git a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java
index 9eddbb55b776..b7c8c4a28279 100644
--- a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java
+++ b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java
@@ -135,7 +135,7 @@ public class SubscribableListener<T> implements ActionListener<T> {
*/
public static <T> SubscribableListener<T> newForked(CheckedConsumer<ActionListener<T>, ? extends Exception> fork) {
final var listener = new SubscribableListener<T>();
- ActionListener.run(listener, fork::accept);
+ ActionListener.run(ActionListener.assertOnce(listener), fork::accept);
return listener;
}
However, on reflection this seems unnecessarily strict and indeed it causes the SubscribableListener
test suite to fail because we actually already assert that newForked
and andThen
receive the returned listener instance, which is therefore safe to complete more than once. I think it's best to document this fact, see #133391, and then we can rely on it here too (so disregard my previous message)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw this test is a really good discriminator of the improvement, it failed for me 9 out of 10 times I ran it having reverted the behaviour change in
Coordinator
, and passed 10 out of 10 with the change in place. Great stuff.