-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Master node disconnect #132023
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Master node disconnect #132023
Changes from 11 commits
21d7b30
829c69f
fb05a2a
962b3b2
4b97f07
2a541b0
eb572d4
3e501b4
b6ddfba
dac102a
d269e47
a668ab1
17e6bd6
cc67155
d47915b
f7cbcbd
c989fd0
c1b1ef2
6708bb4
4150643
170a518
2ee0871
5eec40f
c27fe6a
15d6693
6c3fadc
f45e1ad
b238367
36c9f02
ba8c5e6
4a8f3f7
cdfdb5e
a4ac7fb
cbbacc0
c0a079f
3337cf2
57e5887
25333ca
433b827
9203679
e6b86ef
a180eaf
57db61c
af0c58e
b7922ff
6485e97
1d35bd3
332a86c
98be4ad
0e227f7
00b27f3
8c4de89
d5141bc
ed73225
a7b2f0e
ad45d19
0b4560f
aa82afe
8323493
ff5b4ec
8ad70a2
8316b56
8bf17e5
3f46e13
75b118a
c9f608f
2f493f5
dc7eb96
3683676
cd8502a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.cluster.coordination; | ||
|
||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.cluster.ClusterStateApplier; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.core.Releasable; | ||
import org.elasticsearch.test.ClusterServiceUtils; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.CyclicBarrier; | ||
|
||
/** | ||
* An integration test base class to be used when a test requires a master re-election | ||
*/ | ||
public abstract class MasterElectionTestCase extends ESIntegTestCase { | ||
|
||
/** | ||
* Block the cluster state applier on a node. Returns only when applier is blocked. | ||
* | ||
* @param nodeName The name of the node on which to block the applier | ||
* @param cleanupTasks The list of clean up tasks | ||
* @return A cyclic barrier which when awaited on will un-block the applier | ||
*/ | ||
protected static CyclicBarrier blockClusterStateApplier(String nodeName, ArrayList<Releasable> cleanupTasks) { | ||
final var stateApplierBarrier = new CyclicBarrier(2); | ||
internalCluster().getInstance(ClusterService.class, nodeName).getClusterApplierService().onNewClusterState("test", () -> { | ||
// Meet to signify application is blocked | ||
safeAwait(stateApplierBarrier); | ||
// Wait for the signal to unblock | ||
safeAwait(stateApplierBarrier); | ||
return null; | ||
}, ActionListener.noop()); | ||
cleanupTasks.add(stateApplierBarrier::reset); | ||
|
||
// Wait until state application is blocked | ||
safeAwait(stateApplierBarrier); | ||
return stateApplierBarrier; | ||
} | ||
|
||
/** | ||
* Configure a latch that will be released when the existing master knows of the new master's election | ||
* | ||
* @param newMaster The name of the newMaster node | ||
* @param cleanupTasks The list of cleanup tasks | ||
* @return A latch that will be released when the old master acknowledges the new master's election | ||
*/ | ||
protected CountDownLatch configureElectionLatchForNewMaster(String newMaster, List<Releasable> cleanupTasks) { | ||
|
||
final String originalMasterName = internalCluster().getMasterName(); | ||
logger.info("Original master was {}, new master will be {}", originalMasterName, newMaster); | ||
final var previousMasterKnowsNewMasterIsElectedLatch = new CountDownLatch(1); | ||
ClusterStateApplier newMasterMonitor = event -> { | ||
DiscoveryNode masterNode = event.state().nodes().getMasterNode(); | ||
if (masterNode != null && masterNode.getName().equals(newMaster)) { | ||
previousMasterKnowsNewMasterIsElectedLatch.countDown(); | ||
} | ||
}; | ||
ClusterService originalMasterClusterService = internalCluster().getInstance(ClusterService.class, originalMasterName); | ||
originalMasterClusterService.addStateApplier(newMasterMonitor); | ||
cleanupTasks.add(() -> originalMasterClusterService.removeApplier(newMasterMonitor)); | ||
return previousMasterKnowsNewMasterIsElectedLatch; | ||
} | ||
|
||
/** | ||
* Configure a latch that will be released when the existing master knows it has been re-elected | ||
* | ||
* @param masterNodeName The name of the current master node | ||
* @param electedTerm The term the current master node was elected | ||
* @param cleanupTasks The list of cleanup tasks | ||
* @return A latch that will be released when the master acknowledges it's re-election | ||
*/ | ||
protected CountDownLatch configureElectionLatchForReElectedMaster( | ||
|
||
String masterNodeName, | ||
long electedTerm, | ||
List<Releasable> cleanupTasks | ||
) { | ||
final var masterKnowsItIsReElectedLatch = new CountDownLatch(1); | ||
ClusterStateApplier newMasterMonitor = event -> { | ||
DiscoveryNode masterNode = event.state().nodes().getMasterNode(); | ||
long currentTerm = event.state().coordinationMetadata().term(); | ||
if (masterNode != null && masterNode.getName().equals(masterNodeName) && currentTerm > electedTerm) { | ||
logger.info("Master knows it's re-elected"); | ||
masterKnowsItIsReElectedLatch.countDown(); | ||
} | ||
}; | ||
ClusterService masterClusterService = internalCluster().getInstance(ClusterService.class, masterNodeName); | ||
masterClusterService.addStateApplier(newMasterMonitor); | ||
cleanupTasks.add(() -> masterClusterService.removeApplier(newMasterMonitor)); | ||
return masterKnowsItIsReElectedLatch; | ||
} | ||
|
||
/** | ||
* Add some master-only nodes and block until they've joined the cluster | ||
* <p> | ||
* Ensure that we've got 5 voting nodes in the cluster, this means even if the original | ||
* master accepts its own failed state update before standing down, we can still | ||
* establish a quorum without its (or our own) join. | ||
*/ | ||
protected static String ensureSufficientMasterEligibleNodes() { | ||
|
||
final var votingConfigSizeListener = ClusterServiceUtils.addTemporaryStateListener( | ||
cs -> 5 <= cs.coordinationMetadata().getLastCommittedConfiguration().getNodeIds().size() | ||
); | ||
|
||
try { | ||
final var newNodeNames = internalCluster().startMasterOnlyNodes(Math.max(1, 5 - internalCluster().numMasterNodes())); | ||
safeAwait(votingConfigSizeListener); | ||
return newNodeNames.get(0); | ||
} finally { | ||
votingConfigSizeListener.onResponse(null); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I don't think you need to pull this one up to the base class, it only has one caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in a668ab1