Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
21d7b30
Don't fail on first node-join publication failure
joshua-adams-1 Jul 28, 2025
829c69f
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Jul 28, 2025
fb05a2a
Checkstyle
joshua-adams-1 Jul 28, 2025
962b3b2
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Jul 28, 2025
4b97f07
Adds Node Joining Integration Tests
joshua-adams-1 Aug 7, 2025
2a541b0
Adds nodeExistsWithName method
joshua-adams-1 Aug 7, 2025
eb572d4
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 7, 2025
3e501b4
Minor tweaks
joshua-adams-1 Aug 7, 2025
b6ddfba
Uncomments out solution, and makes NodeJoiningIT
joshua-adams-1 Aug 11, 2025
dac102a
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 11, 2025
d269e47
Ran ./gradlew spotlessApply precommit
joshua-adams-1 Aug 11, 2025
a668ab1
Modify Coordinator Logic
joshua-adams-1 Aug 12, 2025
17e6bd6
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 14, 2025
cc67155
Generalise logging expectations
joshua-adams-1 Aug 14, 2025
d47915b
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 14, 2025
f7cbcbd
[CI] Auto commit changes from spotless
Aug 14, 2025
c989fd0
Add YAML test for "missing lookup key" scenario (#132870)
smalyshev Aug 14, 2025
c1b1ef2
Add memory accounting to exponential histogram library. (#132580)
JonasKunz Aug 14, 2025
6708bb4
Vectorize BQVectorUtils#packAsBinary (#132923)
iverase Aug 14, 2025
4150643
Mute org.elasticsearch.xpack.esql.qa.mixed.EsqlClientYamlIT test {p0=…
elasticsearchmachine Aug 14, 2025
170a518
Remove mutes for resolved CsvTests issues (#132924)
idegtiarenko Aug 14, 2025
2ee0871
ESQL - Allow null values in vector similarity functions (#132919)
carlosdelest Aug 14, 2025
5eec40f
Vectorize BQSpaceUtils#transposeHalfByte (#132935)
iverase Aug 14, 2025
c27fe6a
Send max of two types of max queue latency to ClusterInfo (#132675)
DiannaHohensee Aug 14, 2025
15d6693
[DiskBBQ] Replace n_probe, related to the number of centroids with v…
iverase Aug 14, 2025
6c3fadc
[ML] Add spec files for Llama and AI21 (#132724)
jonathan-buttner Aug 14, 2025
f45e1ad
Remove awaits for closed issues (#132306)
smalyshev Aug 14, 2025
b238367
Suppport per-project behavior in ESQL extra verifiers (#131884)
mark-vieira Aug 14, 2025
36c9f02
Add random tests with match_only_text multi-field (#132380)
parkertimmins Aug 14, 2025
ba8c5e6
Store ignored source in unique stored fields per entry (#132142)
jordan-powers Aug 14, 2025
4a8f3f7
Rename skipping logic to remove hard link to skip_unavailable (#132861)
smalyshev Aug 14, 2025
cdfdb5e
Mute org.elasticsearch.index.mapper.LongFieldMapperTests testFetchMan…
elasticsearchmachine Aug 14, 2025
a4ac7fb
Adding simulate ingest effective mapping (#132833)
masseyke Aug 14, 2025
cbbacc0
Precompute the BitsetCacheKey hashCode (#132875)
joegallo Aug 14, 2025
c0a079f
Fix failing UT by adding a required capability (#132947)
julian-elastic Aug 14, 2025
3337cf2
Mute org.elasticsearch.index.mapper.LongFieldMapperTests testFetch #1…
elasticsearchmachine Aug 14, 2025
57e5887
Remove CrossClusterCancellationIT.createLocalIndex() (#132952)
JeremyDahlgren Aug 14, 2025
25333ca
Unmuting simulate index data stream mapping overrides yaml rest test …
masseyke Aug 14, 2025
433b827
Mute org.elasticsearch.cluster.ClusterInfoServiceIT testMaxQueueLaten…
elasticsearchmachine Aug 14, 2025
9203679
Introduce execution location marker for better handling of remote/loc…
smalyshev Aug 14, 2025
e6b86ef
Implement v_magnitude function (#132765)
svilen-mihaylov-elastic Aug 14, 2025
a180eaf
Breakdown undesired allocations by shard routing role (#132235)
nicktindall Aug 14, 2025
57db61c
Switch to PR-based benchmark pipeline defined in ES repo (#132941)
gbanasiak Aug 15, 2025
af0c58e
Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {…
elasticsearchmachine Aug 15, 2025
b7922ff
Implement WriteLoadConstraintDecider#canAllocate (#132041)
DiannaHohensee Aug 15, 2025
6485e97
Simplify EsqlSession (#132848)
idegtiarenko Aug 15, 2025
1d35bd3
Mute org.elasticsearch.index.mapper.LongFieldMapperTests testSyntheti…
elasticsearchmachine Aug 15, 2025
332a86c
Speed up loading keyword fields with index sorts (#132950)
dnhatn Aug 15, 2025
98be4ad
Merge remote-tracking branch 'upstream/main' into master-node-disconnect
joshua-adams-1 Aug 15, 2025
0e227f7
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 15, 2025
00b27f3
Remove logger.info
joshua-adams-1 Aug 15, 2025
8c4de89
David Turner Comments
joshua-adams-1 Aug 19, 2025
d5141bc
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 19, 2025
ed73225
Fix unit tests
joshua-adams-1 Aug 20, 2025
a7b2f0e
Merge branch 'master-node-disconnect' of github.com:joshua-adams-1/el…
joshua-adams-1 Aug 20, 2025
ad45d19
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 20, 2025
0b4560f
Comments
joshua-adams-1 Aug 22, 2025
aa82afe
Unused method
joshua-adams-1 Aug 22, 2025
8323493
Merge branch 'master-node-disconnect' of github.com:joshua-adams-1/el…
joshua-adams-1 Aug 22, 2025
ff5b4ec
[CI] Auto commit changes from spotless
Aug 22, 2025
8ad70a2
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 22, 2025
8316b56
Changes
joshua-adams-1 Aug 22, 2025
8bf17e5
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 26, 2025
3f46e13
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 27, 2025
75b118a
David Comments
joshua-adams-1 Sep 1, 2025
c9f608f
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Sep 1, 2025
2f493f5
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Sep 2, 2025
dc7eb96
Merge branch 'master-node-disconnect' of github.com:joshua-adams-1/el…
joshua-adams-1 Sep 2, 2025
3683676
David Comments
joshua-adams-1 Sep 2, 2025
cd8502a
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Sep 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.Level;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;

import java.util.Collection;
import java.util.List;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class NodeJoiningIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
// detect leader failover quickly
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.build();
}

public void testNodeJoinsCluster() {
internalCluster().startNodes(3);
String masterNodeName = internalCluster().getMasterName();
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size();
int numberOfMasterNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName)
.state()
.nodes()
.getMasterNodes()
.size();
List<String> namesOfDataNodesInOriginalCluster = getListOfDataNodeNamesFromCluster(masterNodeName);

// Attempt to add new node
String newNodeName = internalCluster().startDataOnlyNode();
ensureStableCluster(4);

// Assert the new data node was added
ClusterState state = internalCluster().clusterService(masterNodeName).state();
assertEquals(numberOfNodesOriginallyInCluster + 1, state.nodes().getSize());
assertEquals(namesOfDataNodesInOriginalCluster.size() + 1, state.nodes().getDataNodes().size());
assertEquals(numberOfMasterNodesOriginallyInCluster, state.nodes().getMasterNodes().size());

List<String> namesOfDataNodesInNewCluster = getListOfDataNodeNamesFromCluster(masterNodeName);
assertTrue(namesOfDataNodesInNewCluster.contains(newNodeName));
for (String nodeName : namesOfDataNodesInOriginalCluster) {
assertTrue(namesOfDataNodesInNewCluster.contains(nodeName));
}
}

@TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination.NodeJoinExecutor:INFO")
public void testNodeTriesToJoinClusterAndThenDifferentMasterIsElected() {
List<String> nodeNames = internalCluster().startNodes(3);
ensureStableCluster(3);
String originalMasterNodeName = internalCluster().getMasterName();
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(originalMasterNodeName).state().getNodes().size();
// Determine upfront who we want the next master to be
final var newMasterNodeName = randomValueOtherThan(originalMasterNodeName, () -> randomFrom(nodeNames));

// Ensure the logging is as expected
try (var mockLog = MockLog.capture(NodeJoinExecutor.class)) {

// Sets MockTransportService behaviour
for (final var transportService : internalCluster().getInstances(TransportService.class)) {
final var mockTransportService = asInstanceOf(MockTransportService.class, transportService);

if (mockTransportService.getLocalNode().getName().equals(newMasterNodeName) == false) {
List<String> listOfActionsToBlock = List.of(
// This forces the current master node to fail
PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME,
// This disables pre-voting on all nodes except the new master, forcing it to win the election
StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME
);
blockActionNameOnMockTransportService(mockTransportService, listOfActionsToBlock);
}
}

// We do not expect to see a WARN log about a node disconnecting (#ES-11449)
addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog);

// We haven't changed master nodes yet
assertEquals(originalMasterNodeName, internalCluster().getMasterName());

// Sends a node join request to the original master node. This will fail, and cause a master failover
// startDataOnlyNode waits for the new node to be added, and this can only occur after a re-election
String newNodeName = internalCluster().startDataOnlyNode();
assertNotEquals(originalMasterNodeName, internalCluster().getMasterName());
logger.info("New master is elected");

// Assert all nodes have accepted N into their cluster state
assertNewNodeIsInAllClusterStates(newNodeName);

mockLog.assertAllExpectationsMatched();

// Assert the new data node was added
DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes();
assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize());
assertTrue(getListOfDataNodeNamesFromCluster(newMasterNodeName).contains(newNodeName));
}
}

/*
In this scenario, node N attempts to join a cluster, there is an election and the original master is re-elected.
Node N should join the cluster, but it should not be disconnected (#ES-11449)
*/
@TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination:INFO")
public void testNodeTriesToJoinClusterAndThenSameMasterIsElected() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw this test is a really good discriminator of the improvement, it failed for me 9 out of 10 times I ran it having reverted the behaviour change in Coordinator, and passed 10 out of 10 with the change in place. Great stuff.

internalCluster().startNodes(3);
ensureStableCluster(3);
String masterNodeName = internalCluster().getMasterName();

long originalTerm = getTerm(masterNodeName);
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size();

try (var mockLog = MockLog.capture(NodeJoinExecutor.class, MasterService.class, ClusterApplierService.class)) {
for (String nodeName : internalCluster().getNodeNames()) {
final var mockTransportService = MockTransportService.getInstance(nodeName);

if (nodeName.equals(masterNodeName)) {
// This makes the master fail, forcing a re-election
blockActionNameOnMockTransportService(
mockTransportService,
List.of(PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME)
);

// Wait until the master has stepped down before removing the publishing ban
// This allows the master to be re-elected
ClusterServiceUtils.addTemporaryStateListener(internalCluster().clusterService(masterNodeName), clusterState -> {
DiscoveryNode currentMasterNode = clusterState.nodes().getMasterNode();
boolean hasMasterSteppedDown = currentMasterNode == null
|| currentMasterNode.getName().equals(masterNodeName) == false;
if (hasMasterSteppedDown) {
logger.info("Master publishing ban removed");
mockTransportService.addSendBehavior(Transport.Connection::sendRequest);
}
return hasMasterSteppedDown;
});

} else {
// This disables pre-voting on all nodes except the master, forcing it to win the election
blockActionNameOnMockTransportService(
mockTransportService,
List.of(StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME)
);
}
}

// We expect the node join request to fail with a FailedToCommitClusterStateException
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"failed to commit cluster state",
MasterService.class.getCanonicalName(),
Level.WARN,
"failed to commit cluster state"
)
);

/*
We expect the cluster to reuse the connection to N and not disconnect it
Therefore, this WARN log should not be thrown (#ES-11449)
*/
addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog);

// Before we add the new node, assert we haven't changed master nodes yet
assertEquals(masterNodeName, internalCluster().getMasterName());

// Sends a node join request to the original master node. This will fail, and cause a master failover
logger.info("Sending node join request");
String newNodeName = internalCluster().startDataOnlyNode();

// Assert the master was re-elected
assertEquals(masterNodeName, internalCluster().getMasterName());
assertTrue(originalTerm < getTerm(masterNodeName));

// Assert all nodes have accepted N into their cluster state
assertNewNodeIsInAllClusterStates(newNodeName);

// If the WARN log was thrown, then the connection to N was disconnected so fail the test
mockLog.assertAllExpectationsMatched();

// Assert the new data node was added
DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes();
assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize());
assertTrue(getListOfDataNodeNamesFromCluster(masterNodeName).contains(newNodeName));
}
}

private long getTerm(String masterNodeName) {
return internalCluster().clusterService(masterNodeName).state().coordinationMetadata().term();
}

private void assertNewNodeIsInAllClusterStates(String newNodeName) {
for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) {
assertTrue(clusterService.state().nodes().getAllNodes().stream().map(DiscoveryNode::getName).toList().contains(newNodeName));
}
}

private List<String> getListOfDataNodeNamesFromCluster(String nodeName) {
return internalCluster().clusterService(nodeName)
.state()
.getNodes()
.getDataNodes()
.values()
.stream()
.map(DiscoveryNode::getName)
.toList();
}

private void addJoiningNodeDisconnectedWarnLogFalseExpectation(MockLog mockLog) {
mockLog.addExpectation(
new MockLog.UnseenEventExpectation(
"warn message with troubleshooting link",
"org.elasticsearch.cluster.coordination.NodeJoinExecutor",
Level.WARN,
"*"
)
);
}

private void blockActionNameOnMockTransportService(MockTransportService mockTransportService, List<String> actionNamesToBlock) {
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (actionNamesToBlock.contains(action)) {
throw new ElasticsearchException("[{}] for [{}] denied", action, connection.getNode());
} else {
connection.sendRequest(requestId, action, request, options);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStatePublicationEvent;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalMasterServiceTask;
Expand All @@ -39,6 +41,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.cluster.version.CompatibilityVersions;
Expand Down Expand Up @@ -190,6 +193,7 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
private final NodeHealthService nodeHealthService;
private final List<PeerFinderListener> peerFinderListeners;
private final LeaderHeartbeatService leaderHeartbeatService;
private final ClusterService clusterService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand Down Expand Up @@ -218,7 +222,8 @@ public Coordinator(
LeaderHeartbeatService leaderHeartbeatService,
PreVoteCollector.Factory preVoteCollectorFactory,
CompatibilityVersions compatibilityVersions,
FeatureService featureService
FeatureService featureService,
ClusterService clusterService
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -328,6 +333,7 @@ public Coordinator(
this.peerFinderListeners.add(clusterBootstrapService);
this.leaderHeartbeatService = leaderHeartbeatService;
this.compatibilityVersions = compatibilityVersions;
this.clusterService = clusterService;
}

/**
Expand Down Expand Up @@ -662,11 +668,57 @@ private void handleJoinRequest(JoinRequest joinRequest, ActionListener<Void> joi
transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() {
@Override
public void onResponse(Releasable response) {
validateJoinRequest(
joinRequest,
ActionListener.runBefore(joinListener, () -> Releasables.close(response))
.delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l))
);
SubscribableListener
// Validates the join request: can the remote node deserialize our cluster state and does it respond to pings?
.<Void>newForked(l -> validateJoinRequest(joinRequest, l))

// Adds the joining node to the cluster state
.<Void>andThen(l -> processJoinRequest(joinRequest, l.delegateResponse((ll, e) -> {
// #ES-11449
if (e instanceof FailedToCommitClusterStateException) {
// The commit failed (i.e. master is failing over) but this does not imply that the join has actually failed:
// the next master may have already accepted the state that we just published and will therefore include the
// joining node in its future states too. Thus we need to wait for the next committed state before we know the
// eventual outcome, and we need to wait for that before we can release (our ref to) the connection and complete
// the listener.

// NB we are on the master update thread here at the end of processing the failed cluster state update, so this
// all happens before any cluster state update that re-elects a master
assert ThreadPool.assertCurrentThreadPool(MasterService.MASTER_UPDATE_THREAD_NAME);

final ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
final var discoveryNodes = event.state().nodes();
// Keep the connection open until the next committed state
if (discoveryNodes.getMasterNode() != null) {
// Remove this listener to avoid memory leaks
clusterService.removeListener(this);
if (discoveryNodes.nodeExists(joinRequest.getSourceNode().getId())) {
ll.onResponse(null);
} else {
ll.onFailure(e);
}
}
}
};
clusterService.addListener(clusterStateListener);
clusterStateListener.clusterChanged(
new ClusterChangedEvent(
"Checking if another master has been elected since "
+ joinRequest.getSourceNode().getName()
+ " attempted to join cluster",
clusterService.state(),
clusterService.state()
)
);
} else {
ll.onFailure(e);
}
})))

// Whatever the outcome, release (our ref to) the connection we just opened and notify the joining node.
.addListener(ActionListener.runBefore(joinListener, () -> Releasables.close(response)));
}

@Override
Expand Down
Loading