|
17 | 17 | import org.elasticsearch.action.support.ActionFilters;
|
18 | 18 | import org.elasticsearch.action.support.PlainActionFuture;
|
19 | 19 | import org.elasticsearch.cluster.ClusterState;
|
| 20 | +import org.elasticsearch.cluster.ClusterStateApplier; |
20 | 21 | import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
21 | 22 | import org.elasticsearch.cluster.block.ClusterBlockException;
|
22 | 23 | import org.elasticsearch.cluster.coordination.LeaderChecker;
|
23 |
| -import org.elasticsearch.cluster.coordination.MasterElectionTestCase; |
24 | 24 | import org.elasticsearch.cluster.coordination.PublicationTransportHandler;
|
25 | 25 | import org.elasticsearch.cluster.coordination.StatefulPreVoteCollector;
|
26 | 26 | import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
| 27 | +import org.elasticsearch.cluster.node.DiscoveryNode; |
27 | 28 | import org.elasticsearch.cluster.service.ClusterService;
|
28 | 29 | import org.elasticsearch.common.io.stream.StreamInput;
|
29 | 30 | import org.elasticsearch.common.settings.Settings;
|
|
34 | 35 | import org.elasticsearch.plugins.ActionPlugin;
|
35 | 36 | import org.elasticsearch.plugins.Plugin;
|
36 | 37 | import org.elasticsearch.tasks.Task;
|
| 38 | +import org.elasticsearch.test.ClusterServiceUtils; |
| 39 | +import org.elasticsearch.test.ESIntegTestCase; |
37 | 40 | import org.elasticsearch.test.transport.MockTransportService;
|
38 | 41 | import org.elasticsearch.threadpool.ThreadPool;
|
39 | 42 | import org.elasticsearch.transport.TransportService;
|
|
42 | 45 | import java.util.ArrayList;
|
43 | 46 | import java.util.Collection;
|
44 | 47 | import java.util.List;
|
| 48 | +import java.util.concurrent.CountDownLatch; |
| 49 | +import java.util.concurrent.CyclicBarrier; |
45 | 50 |
|
46 | 51 | import static org.hamcrest.Matchers.equalTo;
|
47 | 52 | import static org.hamcrest.Matchers.greaterThan;
|
48 | 53 |
|
49 |
| -public class TransportMasterNodeActionIT extends MasterElectionTestCase { |
| 54 | +public class TransportMasterNodeActionIT extends ESIntegTestCase { |
50 | 55 |
|
51 | 56 | @SuppressWarnings("unchecked")
|
52 | 57 | @Override
|
@@ -81,7 +86,7 @@ public void testRoutingLoopProtection() {
|
81 | 86 | .get()
|
82 | 87 | .getState()
|
83 | 88 | .term();
|
84 |
| - final var previousMasterKnowsNewMasterIsElectedLatch = configureElectionLatchForNewMaster(newMaster, cleanupTasks); |
| 89 | + final var previousMasterKnowsNewMasterIsElectedLatch = configureElectionLatch(newMaster, cleanupTasks); |
85 | 90 |
|
86 | 91 | final var newMasterReceivedReroutedMessageFuture = new PlainActionFuture<>();
|
87 | 92 | final var newMasterReceivedReroutedMessageListener = ActionListener.assertOnce(newMasterReceivedReroutedMessageFuture);
|
@@ -153,6 +158,73 @@ public void onFailure(Exception e) {
|
153 | 158 | }
|
154 | 159 | }
|
155 | 160 |
|
| 161 | + /** |
| 162 | + * Block the cluster state applier on a node. Returns only when applier is blocked. |
| 163 | + * |
| 164 | + * @param nodeName The name of the node on which to block the applier |
| 165 | + * @param cleanupTasks The list of clean up tasks |
| 166 | + * @return A cyclic barrier which when awaited on will un-block the applier |
| 167 | + */ |
| 168 | + private static CyclicBarrier blockClusterStateApplier(String nodeName, ArrayList<Releasable> cleanupTasks) { |
| 169 | + final var stateApplierBarrier = new CyclicBarrier(2); |
| 170 | + internalCluster().getInstance(ClusterService.class, nodeName).getClusterApplierService().onNewClusterState("test", () -> { |
| 171 | + // Meet to signify application is blocked |
| 172 | + safeAwait(stateApplierBarrier); |
| 173 | + // Wait for the signal to unblock |
| 174 | + safeAwait(stateApplierBarrier); |
| 175 | + return null; |
| 176 | + }, ActionListener.noop()); |
| 177 | + cleanupTasks.add(stateApplierBarrier::reset); |
| 178 | + |
| 179 | + // Wait until state application is blocked |
| 180 | + safeAwait(stateApplierBarrier); |
| 181 | + return stateApplierBarrier; |
| 182 | + } |
| 183 | + |
| 184 | + /** |
| 185 | + * Configure a latch that will be released when the existing master knows of the new master's election |
| 186 | + * |
| 187 | + * @param newMaster The name of the newMaster node |
| 188 | + * @param cleanupTasks The list of cleanup tasks |
| 189 | + * @return A latch that will be released when the old master acknowledges the new master's election |
| 190 | + */ |
| 191 | + private CountDownLatch configureElectionLatch(String newMaster, List<Releasable> cleanupTasks) { |
| 192 | + final String originalMasterName = internalCluster().getMasterName(); |
| 193 | + logger.info("Original master was {}, new master will be {}", originalMasterName, newMaster); |
| 194 | + final var previousMasterKnowsNewMasterIsElectedLatch = new CountDownLatch(1); |
| 195 | + ClusterStateApplier newMasterMonitor = event -> { |
| 196 | + DiscoveryNode masterNode = event.state().nodes().getMasterNode(); |
| 197 | + if (masterNode != null && masterNode.getName().equals(newMaster)) { |
| 198 | + previousMasterKnowsNewMasterIsElectedLatch.countDown(); |
| 199 | + } |
| 200 | + }; |
| 201 | + ClusterService originalMasterClusterService = internalCluster().getInstance(ClusterService.class, originalMasterName); |
| 202 | + originalMasterClusterService.addStateApplier(newMasterMonitor); |
| 203 | + cleanupTasks.add(() -> originalMasterClusterService.removeApplier(newMasterMonitor)); |
| 204 | + return previousMasterKnowsNewMasterIsElectedLatch; |
| 205 | + } |
| 206 | + |
| 207 | + /** |
| 208 | + * Add some master-only nodes and block until they've joined the cluster |
| 209 | + * <p> |
| 210 | + * Ensure that we've got 5 voting nodes in the cluster, this means even if the original |
| 211 | + * master accepts its own failed state update before standing down, we can still |
| 212 | + * establish a quorum without its (or our own) join. |
| 213 | + */ |
| 214 | + private static String ensureSufficientMasterEligibleNodes() { |
| 215 | + final var votingConfigSizeListener = ClusterServiceUtils.addTemporaryStateListener( |
| 216 | + cs -> 5 <= cs.coordinationMetadata().getLastCommittedConfiguration().getNodeIds().size() |
| 217 | + ); |
| 218 | + |
| 219 | + try { |
| 220 | + final var newNodeNames = internalCluster().startMasterOnlyNodes(Math.max(1, 5 - internalCluster().numMasterNodes())); |
| 221 | + safeAwait(votingConfigSizeListener); |
| 222 | + return newNodeNames.get(0); |
| 223 | + } finally { |
| 224 | + votingConfigSizeListener.onResponse(null); |
| 225 | + } |
| 226 | + } |
| 227 | + |
156 | 228 | private static final ActionType<ActionResponse.Empty> TEST_ACTION_TYPE = new ActionType<>("internal:test");
|
157 | 229 |
|
158 | 230 | public static final class TestActionPlugin extends Plugin implements ActionPlugin {
|
|
0 commit comments