Skip to content

Commit 4b97f07

Browse files
Adds Node Joining Integration Tests
Adds two new integration test suites, `NodeJoiningIT` and `NodeJoiningMasterElectionIT`. These included tests related to the coordinator logic when a node joins the cluster. There is also a proposed solution in the Coordinator class, currently commented out. This will be uncommented in a follow up commit
1 parent 962b3b2 commit 4b97f07

File tree

13 files changed

+789
-187
lines changed

13 files changed

+789
-187
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/support/master/TransportMasterNodeActionIT.java

Lines changed: 3 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@
1717
import org.elasticsearch.action.support.ActionFilters;
1818
import org.elasticsearch.action.support.PlainActionFuture;
1919
import org.elasticsearch.cluster.ClusterState;
20-
import org.elasticsearch.cluster.ClusterStateApplier;
2120
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2221
import org.elasticsearch.cluster.block.ClusterBlockException;
2322
import org.elasticsearch.cluster.coordination.LeaderChecker;
23+
import org.elasticsearch.cluster.coordination.MasterElectionTestCase;
2424
import org.elasticsearch.cluster.coordination.PublicationTransportHandler;
2525
import org.elasticsearch.cluster.coordination.StatefulPreVoteCollector;
2626
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
27-
import org.elasticsearch.cluster.node.DiscoveryNode;
2827
import org.elasticsearch.cluster.service.ClusterService;
2928
import org.elasticsearch.common.io.stream.StreamInput;
3029
import org.elasticsearch.common.settings.Settings;
@@ -35,8 +34,6 @@
3534
import org.elasticsearch.plugins.ActionPlugin;
3635
import org.elasticsearch.plugins.Plugin;
3736
import org.elasticsearch.tasks.Task;
38-
import org.elasticsearch.test.ClusterServiceUtils;
39-
import org.elasticsearch.test.ESIntegTestCase;
4037
import org.elasticsearch.test.transport.MockTransportService;
4138
import org.elasticsearch.threadpool.ThreadPool;
4239
import org.elasticsearch.transport.TransportService;
@@ -45,13 +42,11 @@
4542
import java.util.ArrayList;
4643
import java.util.Collection;
4744
import java.util.List;
48-
import java.util.concurrent.CountDownLatch;
49-
import java.util.concurrent.CyclicBarrier;
5045

5146
import static org.hamcrest.Matchers.equalTo;
5247
import static org.hamcrest.Matchers.greaterThan;
5348

54-
public class TransportMasterNodeActionIT extends ESIntegTestCase {
49+
public class TransportMasterNodeActionIT extends MasterElectionTestCase {
5550

5651
@SuppressWarnings("unchecked")
5752
@Override
@@ -86,7 +81,7 @@ public void testRoutingLoopProtection() {
8681
.get()
8782
.getState()
8883
.term();
89-
final var previousMasterKnowsNewMasterIsElectedLatch = configureElectionLatch(newMaster, cleanupTasks);
84+
final var previousMasterKnowsNewMasterIsElectedLatch = configureElectionLatchForNewMaster(newMaster, cleanupTasks);
9085

9186
final var newMasterReceivedReroutedMessageFuture = new PlainActionFuture<>();
9287
final var newMasterReceivedReroutedMessageListener = ActionListener.assertOnce(newMasterReceivedReroutedMessageFuture);
@@ -158,73 +153,6 @@ public void onFailure(Exception e) {
158153
}
159154
}
160155

161-
/**
162-
* Block the cluster state applier on a node. Returns only when applier is blocked.
163-
*
164-
* @param nodeName The name of the node on which to block the applier
165-
* @param cleanupTasks The list of clean up tasks
166-
* @return A cyclic barrier which when awaited on will un-block the applier
167-
*/
168-
private static CyclicBarrier blockClusterStateApplier(String nodeName, ArrayList<Releasable> cleanupTasks) {
169-
final var stateApplierBarrier = new CyclicBarrier(2);
170-
internalCluster().getInstance(ClusterService.class, nodeName).getClusterApplierService().onNewClusterState("test", () -> {
171-
// Meet to signify application is blocked
172-
safeAwait(stateApplierBarrier);
173-
// Wait for the signal to unblock
174-
safeAwait(stateApplierBarrier);
175-
return null;
176-
}, ActionListener.noop());
177-
cleanupTasks.add(stateApplierBarrier::reset);
178-
179-
// Wait until state application is blocked
180-
safeAwait(stateApplierBarrier);
181-
return stateApplierBarrier;
182-
}
183-
184-
/**
185-
* Configure a latch that will be released when the existing master knows of the new master's election
186-
*
187-
* @param newMaster The name of the newMaster node
188-
* @param cleanupTasks The list of cleanup tasks
189-
* @return A latch that will be released when the old master acknowledges the new master's election
190-
*/
191-
private CountDownLatch configureElectionLatch(String newMaster, List<Releasable> cleanupTasks) {
192-
final String originalMasterName = internalCluster().getMasterName();
193-
logger.info("Original master was {}, new master will be {}", originalMasterName, newMaster);
194-
final var previousMasterKnowsNewMasterIsElectedLatch = new CountDownLatch(1);
195-
ClusterStateApplier newMasterMonitor = event -> {
196-
DiscoveryNode masterNode = event.state().nodes().getMasterNode();
197-
if (masterNode != null && masterNode.getName().equals(newMaster)) {
198-
previousMasterKnowsNewMasterIsElectedLatch.countDown();
199-
}
200-
};
201-
ClusterService originalMasterClusterService = internalCluster().getInstance(ClusterService.class, originalMasterName);
202-
originalMasterClusterService.addStateApplier(newMasterMonitor);
203-
cleanupTasks.add(() -> originalMasterClusterService.removeApplier(newMasterMonitor));
204-
return previousMasterKnowsNewMasterIsElectedLatch;
205-
}
206-
207-
/**
208-
* Add some master-only nodes and block until they've joined the cluster
209-
* <p>
210-
* Ensure that we've got 5 voting nodes in the cluster, this means even if the original
211-
* master accepts its own failed state update before standing down, we can still
212-
* establish a quorum without its (or our own) join.
213-
*/
214-
private static String ensureSufficientMasterEligibleNodes() {
215-
final var votingConfigSizeListener = ClusterServiceUtils.addTemporaryStateListener(
216-
cs -> 5 <= cs.coordinationMetadata().getLastCommittedConfiguration().getNodeIds().size()
217-
);
218-
219-
try {
220-
final var newNodeNames = internalCluster().startMasterOnlyNodes(Math.max(1, 5 - internalCluster().numMasterNodes()));
221-
safeAwait(votingConfigSizeListener);
222-
return newNodeNames.get(0);
223-
} finally {
224-
votingConfigSizeListener.onResponse(null);
225-
}
226-
}
227-
228156
private static final ActionType<ActionResponse.Empty> TEST_ACTION_TYPE = new ActionType<>("internal:test");
229157

230158
public static final class TestActionPlugin extends Plugin implements ActionPlugin {
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.coordination;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.cluster.ClusterStateApplier;
14+
import org.elasticsearch.cluster.node.DiscoveryNode;
15+
import org.elasticsearch.cluster.service.ClusterService;
16+
import org.elasticsearch.core.Releasable;
17+
import org.elasticsearch.test.ClusterServiceUtils;
18+
import org.elasticsearch.test.ESIntegTestCase;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.CyclicBarrier;
24+
25+
public abstract class MasterElectionTestCase extends ESIntegTestCase {
26+
/**
27+
* Block the cluster state applier on a node. Returns only when applier is blocked.
28+
*
29+
* @param nodeName The name of the node on which to block the applier
30+
* @param cleanupTasks The list of clean up tasks
31+
* @return A cyclic barrier which when awaited on will un-block the applier
32+
*/
33+
protected static CyclicBarrier blockClusterStateApplier(String nodeName, ArrayList<Releasable> cleanupTasks) {
34+
final var stateApplierBarrier = new CyclicBarrier(2);
35+
internalCluster().getInstance(ClusterService.class, nodeName).getClusterApplierService().onNewClusterState("test", () -> {
36+
// Meet to signify application is blocked
37+
safeAwait(stateApplierBarrier);
38+
// Wait for the signal to unblock
39+
safeAwait(stateApplierBarrier);
40+
return null;
41+
}, ActionListener.noop());
42+
cleanupTasks.add(stateApplierBarrier::reset);
43+
44+
// Wait until state application is blocked
45+
safeAwait(stateApplierBarrier);
46+
return stateApplierBarrier;
47+
}
48+
49+
/**
50+
* Configure a latch that will be released when the existing master knows of the new master's election
51+
*
52+
* @param newMaster The name of the newMaster node
53+
* @param cleanupTasks The list of cleanup tasks
54+
* @return A latch that will be released when the old master acknowledges the new master's election
55+
*/
56+
protected CountDownLatch configureElectionLatchForNewMaster(String newMaster, List<Releasable> cleanupTasks) {
57+
final String originalMasterName = internalCluster().getMasterName();
58+
logger.info("Original master was {}, new master will be {}", originalMasterName, newMaster);
59+
final var previousMasterKnowsNewMasterIsElectedLatch = new CountDownLatch(1);
60+
ClusterStateApplier newMasterMonitor = event -> {
61+
DiscoveryNode masterNode = event.state().nodes().getMasterNode();
62+
if (masterNode != null && masterNode.getName().equals(newMaster)) {
63+
previousMasterKnowsNewMasterIsElectedLatch.countDown();
64+
}
65+
};
66+
ClusterService originalMasterClusterService = internalCluster().getInstance(ClusterService.class, originalMasterName);
67+
originalMasterClusterService.addStateApplier(newMasterMonitor);
68+
cleanupTasks.add(() -> originalMasterClusterService.removeApplier(newMasterMonitor));
69+
return previousMasterKnowsNewMasterIsElectedLatch;
70+
}
71+
72+
/**
73+
* Configure a latch that will be released when the existing master knows it has been re-elected
74+
*
75+
* @param masterNodeName The name of the current master node
76+
* @param originalTerm The term the current master node was elected
77+
* @param cleanupTasks The list of cleanup tasks
78+
* @return A latch that will be released when the master acknowledges it's re-election
79+
*/
80+
protected CountDownLatch configureElectionLatchForReElectedMaster(String masterNodeName, long originalTerm, List<Releasable> cleanupTasks) {
81+
final var masterKnowsItIsReElectedLatch = new CountDownLatch(1);
82+
ClusterStateApplier newMasterMonitor = event -> {
83+
DiscoveryNode masterNode = event.state().nodes().getMasterNode();
84+
long currentTerm = event.state().coordinationMetadata().term();
85+
if (masterNode != null && masterNode.getName().equals(masterNodeName) && currentTerm > originalTerm) {
86+
masterKnowsItIsReElectedLatch.countDown();
87+
}
88+
};
89+
ClusterService masterClusterService = internalCluster().getInstance(ClusterService.class, masterNodeName);
90+
masterClusterService.addStateApplier(newMasterMonitor);
91+
cleanupTasks.add(() -> masterClusterService.removeApplier(newMasterMonitor));
92+
return masterKnowsItIsReElectedLatch;
93+
}
94+
95+
/**
96+
* Add some master-only nodes and block until they've joined the cluster
97+
* <p>
98+
* Ensure that we've got 5 voting nodes in the cluster, this means even if the original
99+
* master accepts its own failed state update before standing down, we can still
100+
* establish a quorum without its (or our own) join.
101+
*/
102+
protected static String ensureSufficientMasterEligibleNodes() {
103+
final var votingConfigSizeListener = ClusterServiceUtils.addTemporaryStateListener(
104+
cs -> 5 <= cs.coordinationMetadata().getLastCommittedConfiguration().getNodeIds().size()
105+
);
106+
107+
try {
108+
final var newNodeNames = internalCluster().startMasterOnlyNodes(Math.max(1, 5 - internalCluster().numMasterNodes()));
109+
safeAwait(votingConfigSizeListener);
110+
return newNodeNames.get(0);
111+
} finally {
112+
votingConfigSizeListener.onResponse(null);
113+
}
114+
}
115+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.coordination;
11+
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.node.DiscoveryNode;
14+
import org.elasticsearch.common.util.CollectionUtils;
15+
import org.elasticsearch.plugins.Plugin;
16+
import org.elasticsearch.test.ESIntegTestCase;
17+
import org.elasticsearch.test.transport.MockTransportService;
18+
19+
import java.util.Collection;
20+
import java.util.List;
21+
import java.util.Set;
22+
import java.util.stream.Collectors;
23+
24+
@ESIntegTestCase.ClusterScope(
25+
scope = ESIntegTestCase.Scope.TEST,
26+
numDataNodes = 0,
27+
numClientNodes = 0,
28+
autoManageMasterNodes = false
29+
)
30+
public class NodeJoiningIT extends MasterElectionTestCase {
31+
32+
@Override
33+
protected Collection<Class<? extends Plugin>> nodePlugins() {
34+
return CollectionUtils.appendToCopyNoNullElements(
35+
super.nodePlugins(),
36+
MockTransportService.TestPlugin.class
37+
);
38+
}
39+
40+
public void testNodeJoinsCluster() {
41+
internalCluster().setBootstrapMasterNodeIndex(0);
42+
String masterNodeName = internalCluster().startMasterOnlyNode();
43+
String dataNodeName = internalCluster().startDataOnlyNode();
44+
ensureStableCluster(2);
45+
46+
String newNodeName = internalCluster().startDataOnlyNode();
47+
ensureStableCluster(3);
48+
49+
// Assert the new data node was added
50+
ClusterState state = internalCluster().clusterService().state();
51+
assertEquals(3, state.nodes().getSize());
52+
assertEquals(2, state.nodes().getDataNodes().size());
53+
assertEquals(1, state.nodes().getMasterNodes().size());
54+
55+
assertEquals(masterNodeName, state.nodes().getMasterNode().getName());
56+
Set<String> allDataNodeNames = state.nodes().getDataNodes().values()
57+
.stream()
58+
.map(DiscoveryNode::getName)
59+
.collect(Collectors.toSet());
60+
assertTrue(allDataNodeNames.contains(dataNodeName));
61+
assertTrue(allDataNodeNames.contains(newNodeName));
62+
}
63+
64+
public void testNodeFailsToJoinClusterWhenMasterNodeCannotPublishState() {
65+
internalCluster().setBootstrapMasterNodeIndex(0);
66+
String masterNodeName = internalCluster().startMasterOnlyNode();
67+
String dataNodeName = internalCluster().startDataOnlyNode();
68+
ensureStableCluster(2);
69+
70+
final var masterNodeTransportService = MockTransportService.getInstance(masterNodeName);
71+
masterNodeTransportService.addRequestHandlingBehavior(
72+
PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME,
73+
(handler, request, channel, task) -> channel.sendResponse(new IllegalStateException("cluster state updates blocked"))
74+
);
75+
76+
// This will fail because the master cannot publish state
77+
String newNodeName = internalCluster().startNode();
78+
79+
// Assert no new node was added
80+
ClusterState state = internalCluster().clusterService().state();
81+
assertEquals(2, state.nodes().getSize());
82+
assertEquals(1, state.nodes().getDataNodes().size());
83+
assertEquals(1, state.nodes().getMasterNodes().size());
84+
85+
// Assert the only nodes in the cluster are the original ones
86+
assertEquals(masterNodeName, state.nodes().getMasterNode().getName());
87+
List<String> allDataNodeNames = state.nodes().getDataNodes().values()
88+
.stream()
89+
.map(DiscoveryNode::getName)
90+
.toList();
91+
assertTrue(allDataNodeNames.contains(dataNodeName));
92+
assertFalse(allDataNodeNames.contains(newNodeName));
93+
94+
masterNodeTransportService.clearAllRules();
95+
}
96+
97+
public void testNodeFailsToJoinClusterWhenDataNodeCannotReceiveState() {
98+
internalCluster().setBootstrapMasterNodeIndex(0);
99+
String masterNodeName = internalCluster().startMasterOnlyNode();
100+
String dataNodeName = internalCluster().startDataOnlyNode();
101+
ensureStableCluster(2);
102+
103+
// block publications received by non-master node.
104+
final var dataNodeTransportService = MockTransportService.getInstance(dataNodeName);
105+
dataNodeTransportService.addRequestHandlingBehavior(
106+
PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME,
107+
(handler, request, channel, task) -> channel.sendResponse(new IllegalStateException("cluster state updates blocked"))
108+
);
109+
110+
// This will fail because the cluster state was not published to the data node, and therefore not acknowledged
111+
String newNodeName = internalCluster().startNode();
112+
113+
// Assert no new node was added
114+
ClusterState state = internalCluster().clusterService().state();
115+
assertEquals(2, state.nodes().getSize());
116+
assertEquals(1, state.nodes().getDataNodes().size());
117+
assertEquals(1, state.nodes().getMasterNodes().size());
118+
119+
// Assert the only nodes in the cluster are the original ones
120+
assertEquals(masterNodeName, state.nodes().getMasterNode().getName());
121+
List<String> allDataNodeNames = state.nodes().getDataNodes().values()
122+
.stream()
123+
.map(DiscoveryNode::getName)
124+
.toList();
125+
assertTrue(allDataNodeNames.contains(dataNodeName));
126+
assertFalse(allDataNodeNames.contains(newNodeName));
127+
128+
dataNodeTransportService.clearAllRules();
129+
}
130+
}

0 commit comments

Comments
 (0)