Skip to content

Commit 7439976

Browse files
Master Node Disconnect (#134213)
Extends the Coordinator so that we don't prematurely close the connection to a joining node. This prevents a `node-join: [{}] with reason [{}]; for troubleshooting guidance, see {}` WARN log being emitted unnecessarily. Closes #126192 Closes ES-11449
1 parent 5a4010c commit 7439976

File tree

8 files changed

+344
-14
lines changed

8 files changed

+344
-14
lines changed
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.coordination;
11+
12+
import org.apache.logging.log4j.Level;
13+
import org.elasticsearch.ElasticsearchException;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.node.DiscoveryNode;
16+
import org.elasticsearch.cluster.node.DiscoveryNodes;
17+
import org.elasticsearch.cluster.service.ClusterApplierService;
18+
import org.elasticsearch.cluster.service.ClusterService;
19+
import org.elasticsearch.cluster.service.MasterService;
20+
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.common.util.CollectionUtils;
22+
import org.elasticsearch.plugins.Plugin;
23+
import org.elasticsearch.test.ClusterServiceUtils;
24+
import org.elasticsearch.test.ESIntegTestCase;
25+
import org.elasticsearch.test.MockLog;
26+
import org.elasticsearch.test.junit.annotations.TestLogging;
27+
import org.elasticsearch.test.transport.MockTransportService;
28+
import org.elasticsearch.transport.Transport;
29+
import org.elasticsearch.transport.TransportService;
30+
31+
import java.util.Collection;
32+
import java.util.List;
33+
34+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
35+
public class NodeJoiningIT extends ESIntegTestCase {
36+
37+
@Override
38+
protected Collection<Class<? extends Plugin>> nodePlugins() {
39+
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class);
40+
}
41+
42+
@Override
43+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
44+
return Settings.builder()
45+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
46+
// detect leader failover quickly
47+
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
48+
.put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
49+
.build();
50+
}
51+
52+
public void testNodeJoinsCluster() {
53+
internalCluster().startNodes(3);
54+
String masterNodeName = internalCluster().getMasterName();
55+
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size();
56+
int numberOfMasterNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName)
57+
.state()
58+
.nodes()
59+
.getMasterNodes()
60+
.size();
61+
List<String> namesOfDataNodesInOriginalCluster = getListOfDataNodeNamesFromCluster(masterNodeName);
62+
63+
// Attempt to add new node
64+
String newNodeName = internalCluster().startDataOnlyNode();
65+
ensureStableCluster(4);
66+
67+
// Assert the new data node was added
68+
ClusterState state = internalCluster().clusterService(masterNodeName).state();
69+
assertEquals(numberOfNodesOriginallyInCluster + 1, state.nodes().getSize());
70+
assertEquals(namesOfDataNodesInOriginalCluster.size() + 1, state.nodes().getDataNodes().size());
71+
assertEquals(numberOfMasterNodesOriginallyInCluster, state.nodes().getMasterNodes().size());
72+
73+
List<String> namesOfDataNodesInNewCluster = getListOfDataNodeNamesFromCluster(masterNodeName);
74+
assertTrue(namesOfDataNodesInNewCluster.contains(newNodeName));
75+
for (String nodeName : namesOfDataNodesInOriginalCluster) {
76+
assertTrue(namesOfDataNodesInNewCluster.contains(nodeName));
77+
}
78+
}
79+
80+
@TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination.NodeJoinExecutor:INFO")
81+
public void testNodeTriesToJoinClusterAndThenDifferentMasterIsElected() {
82+
List<String> nodeNames = internalCluster().startNodes(3);
83+
ensureStableCluster(3);
84+
String originalMasterNodeName = internalCluster().getMasterName();
85+
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(originalMasterNodeName).state().getNodes().size();
86+
// Determine upfront who we want the next master to be
87+
final var newMasterNodeName = randomValueOtherThan(originalMasterNodeName, () -> randomFrom(nodeNames));
88+
89+
// Ensure the logging is as expected
90+
try (var mockLog = MockLog.capture(NodeJoinExecutor.class)) {
91+
92+
// Sets MockTransportService behaviour
93+
for (final var transportService : internalCluster().getInstances(TransportService.class)) {
94+
final var mockTransportService = asInstanceOf(MockTransportService.class, transportService);
95+
96+
if (mockTransportService.getLocalNode().getName().equals(newMasterNodeName) == false) {
97+
List<String> listOfActionsToBlock = List.of(
98+
// This forces the current master node to fail
99+
PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME,
100+
// This disables pre-voting on all nodes except the new master, forcing it to win the election
101+
StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME
102+
);
103+
blockActionNameOnMockTransportService(mockTransportService, listOfActionsToBlock);
104+
}
105+
}
106+
107+
// We do not expect to see a WARN log about a node disconnecting (#ES-11449)
108+
addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog);
109+
110+
// We haven't changed master nodes yet
111+
assertEquals(originalMasterNodeName, internalCluster().getMasterName());
112+
113+
// Sends a node join request to the original master node. This will fail, and cause a master failover
114+
// startDataOnlyNode waits for the new node to be added, and this can only occur after a re-election
115+
String newNodeName = internalCluster().startDataOnlyNode();
116+
assertNotEquals(originalMasterNodeName, internalCluster().getMasterName());
117+
logger.info("New master is elected");
118+
119+
// Assert all nodes have accepted N into their cluster state
120+
assertNewNodeIsInAllClusterStates(newNodeName);
121+
122+
mockLog.assertAllExpectationsMatched();
123+
124+
// Assert the new data node was added
125+
DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes();
126+
assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize());
127+
assertTrue(getListOfDataNodeNamesFromCluster(newMasterNodeName).contains(newNodeName));
128+
}
129+
}
130+
131+
/*
132+
In this scenario, node N attempts to join a cluster, there is an election and the original master is re-elected.
133+
Node N should join the cluster, but it should not be disconnected (#ES-11449)
134+
*/
135+
@TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination:INFO")
136+
public void testNodeTriesToJoinClusterAndThenSameMasterIsElected() {
137+
internalCluster().startNodes(3);
138+
ensureStableCluster(3);
139+
String masterNodeName = internalCluster().getMasterName();
140+
141+
long originalTerm = getTerm(masterNodeName);
142+
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size();
143+
144+
try (var mockLog = MockLog.capture(NodeJoinExecutor.class, MasterService.class, ClusterApplierService.class)) {
145+
for (String nodeName : internalCluster().getNodeNames()) {
146+
final var mockTransportService = MockTransportService.getInstance(nodeName);
147+
148+
if (nodeName.equals(masterNodeName)) {
149+
// This makes the master fail, forcing a re-election
150+
blockActionNameOnMockTransportService(
151+
mockTransportService,
152+
List.of(PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME)
153+
);
154+
155+
// Wait until the master has stepped down before removing the publishing ban
156+
// This allows the master to be re-elected
157+
ClusterServiceUtils.addTemporaryStateListener(internalCluster().clusterService(masterNodeName), clusterState -> {
158+
DiscoveryNode currentMasterNode = clusterState.nodes().getMasterNode();
159+
boolean hasMasterSteppedDown = currentMasterNode == null
160+
|| currentMasterNode.getName().equals(masterNodeName) == false;
161+
if (hasMasterSteppedDown) {
162+
logger.info("Master publishing ban removed");
163+
mockTransportService.addSendBehavior(Transport.Connection::sendRequest);
164+
}
165+
return hasMasterSteppedDown;
166+
});
167+
168+
} else {
169+
// This disables pre-voting on all nodes except the master, forcing it to win the election
170+
blockActionNameOnMockTransportService(
171+
mockTransportService,
172+
List.of(StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME)
173+
);
174+
}
175+
}
176+
177+
// We expect the node join request to fail with a FailedToCommitClusterStateException
178+
mockLog.addExpectation(
179+
new MockLog.SeenEventExpectation(
180+
"failed to commit cluster state",
181+
MasterService.class.getCanonicalName(),
182+
Level.WARN,
183+
"failed to commit cluster state"
184+
)
185+
);
186+
187+
/*
188+
We expect the cluster to reuse the connection to N and not disconnect it
189+
Therefore, this WARN log should not be thrown (#ES-11449)
190+
*/
191+
addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog);
192+
193+
// Before we add the new node, assert we haven't changed master nodes yet
194+
assertEquals(masterNodeName, internalCluster().getMasterName());
195+
196+
// Sends a node join request to the original master node. This will fail, and cause a master failover
197+
logger.info("Sending node join request");
198+
String newNodeName = internalCluster().startDataOnlyNode();
199+
200+
// Assert the master was re-elected
201+
assertEquals(masterNodeName, internalCluster().getMasterName());
202+
assertTrue(originalTerm < getTerm(masterNodeName));
203+
204+
// Assert all nodes have accepted N into their cluster state
205+
assertNewNodeIsInAllClusterStates(newNodeName);
206+
207+
// If the WARN log was thrown, then the connection to N was disconnected so fail the test
208+
mockLog.assertAllExpectationsMatched();
209+
210+
// Assert the new data node was added
211+
DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes();
212+
assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize());
213+
assertTrue(getListOfDataNodeNamesFromCluster(masterNodeName).contains(newNodeName));
214+
}
215+
}
216+
217+
private long getTerm(String masterNodeName) {
218+
return internalCluster().clusterService(masterNodeName).state().coordinationMetadata().term();
219+
}
220+
221+
private void assertNewNodeIsInAllClusterStates(String newNodeName) {
222+
for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) {
223+
assertTrue(clusterService.state().nodes().getAllNodes().stream().map(DiscoveryNode::getName).toList().contains(newNodeName));
224+
}
225+
}
226+
227+
private List<String> getListOfDataNodeNamesFromCluster(String nodeName) {
228+
return internalCluster().clusterService(nodeName)
229+
.state()
230+
.getNodes()
231+
.getDataNodes()
232+
.values()
233+
.stream()
234+
.map(DiscoveryNode::getName)
235+
.toList();
236+
}
237+
238+
private void addJoiningNodeDisconnectedWarnLogFalseExpectation(MockLog mockLog) {
239+
mockLog.addExpectation(
240+
new MockLog.UnseenEventExpectation(
241+
"warn message with troubleshooting link",
242+
"org.elasticsearch.cluster.coordination.NodeJoinExecutor",
243+
Level.WARN,
244+
"*"
245+
)
246+
);
247+
}
248+
249+
private void blockActionNameOnMockTransportService(MockTransportService mockTransportService, List<String> actionNamesToBlock) {
250+
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
251+
if (actionNamesToBlock.contains(action)) {
252+
throw new ElasticsearchException("[{}] for [{}] denied", action, connection.getNode());
253+
} else {
254+
connection.sendRequest(requestId, action, request, options);
255+
}
256+
});
257+
}
258+
}

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import org.elasticsearch.action.support.SubscribableListener;
2020
import org.elasticsearch.action.support.ThreadedActionListener;
2121
import org.elasticsearch.client.internal.Client;
22+
import org.elasticsearch.cluster.ClusterChangedEvent;
2223
import org.elasticsearch.cluster.ClusterName;
2324
import org.elasticsearch.cluster.ClusterState;
25+
import org.elasticsearch.cluster.ClusterStateListener;
2426
import org.elasticsearch.cluster.ClusterStatePublicationEvent;
2527
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2628
import org.elasticsearch.cluster.LocalMasterServiceTask;
@@ -40,6 +42,7 @@
4042
import org.elasticsearch.cluster.routing.allocation.AllocationService;
4143
import org.elasticsearch.cluster.service.ClusterApplier;
4244
import org.elasticsearch.cluster.service.ClusterApplierService;
45+
import org.elasticsearch.cluster.service.ClusterService;
4346
import org.elasticsearch.cluster.service.MasterService;
4447
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
4548
import org.elasticsearch.cluster.version.CompatibilityVersions;
@@ -191,6 +194,7 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
191194
private final NodeHealthService nodeHealthService;
192195
private final List<PeerFinderListener> peerFinderListeners;
193196
private final LeaderHeartbeatService leaderHeartbeatService;
197+
private final ClusterService clusterService;
194198

195199
/**
196200
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
@@ -219,7 +223,8 @@ public Coordinator(
219223
LeaderHeartbeatService leaderHeartbeatService,
220224
PreVoteCollector.Factory preVoteCollectorFactory,
221225
CompatibilityVersions compatibilityVersions,
222-
FeatureService featureService
226+
FeatureService featureService,
227+
ClusterService clusterService
223228
) {
224229
this.settings = settings;
225230
this.transportService = transportService;
@@ -329,6 +334,7 @@ public Coordinator(
329334
this.peerFinderListeners.add(clusterBootstrapService);
330335
this.leaderHeartbeatService = leaderHeartbeatService;
331336
this.compatibilityVersions = compatibilityVersions;
337+
this.clusterService = clusterService;
332338
}
333339

334340
/**
@@ -663,11 +669,57 @@ private void handleJoinRequest(JoinRequest joinRequest, ActionListener<Void> joi
663669
transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() {
664670
@Override
665671
public void onResponse(Releasable response) {
666-
validateJoinRequest(
667-
joinRequest,
668-
ActionListener.runBefore(joinListener, () -> Releasables.close(response))
669-
.delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l))
670-
);
672+
SubscribableListener
673+
// Validates the join request: can the remote node deserialize our cluster state and does it respond to pings?
674+
.<Void>newForked(l -> validateJoinRequest(joinRequest, l))
675+
676+
// Adds the joining node to the cluster state
677+
.<Void>andThen(l -> processJoinRequest(joinRequest, l.delegateResponse((ll, e) -> {
678+
// #ES-11449
679+
if (e instanceof FailedToCommitClusterStateException) {
680+
// The commit failed (i.e. master is failing over) but this does not imply that the join has actually failed:
681+
// the next master may have already accepted the state that we just published and will therefore include the
682+
// joining node in its future states too. Thus, we need to wait for the next committed state before we know the
683+
// eventual outcome, and we need to wait for that before we can release (our ref to) the connection and complete
684+
// the listener.
685+
686+
// NB we are on the master update thread here at the end of processing the failed cluster state update, so this
687+
// all happens before any cluster state update that re-elects a master
688+
assert ThreadPool.assertCurrentThreadPool(MasterService.MASTER_UPDATE_THREAD_NAME);
689+
690+
final ClusterStateListener clusterStateListener = new ClusterStateListener() {
691+
@Override
692+
public void clusterChanged(ClusterChangedEvent event) {
693+
final var discoveryNodes = event.state().nodes();
694+
// Keep the connection open until the next committed state
695+
if (discoveryNodes.getMasterNode() != null) {
696+
// Remove this listener to avoid memory leaks
697+
clusterService.removeListener(this);
698+
if (discoveryNodes.nodeExists(joinRequest.getSourceNode().getId())) {
699+
ll.onResponse(null);
700+
} else {
701+
ll.onFailure(e);
702+
}
703+
}
704+
}
705+
};
706+
clusterService.addListener(clusterStateListener);
707+
clusterStateListener.clusterChanged(
708+
new ClusterChangedEvent(
709+
"Checking if another master has been elected since "
710+
+ joinRequest.getSourceNode().getName()
711+
+ " attempted to join cluster",
712+
clusterService.state(),
713+
clusterService.state()
714+
)
715+
);
716+
} else {
717+
ll.onFailure(e);
718+
}
719+
})))
720+
721+
// Whatever the outcome, release (our ref to) the connection we just opened and notify the joining node.
722+
.addListener(ActionListener.runBefore(joinListener, () -> Releasables.close(response)));
671723
}
672724

673725
@Override

0 commit comments

Comments
 (0)