Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
21d7b30
Don't fail on first node-join publication failure
joshua-adams-1 Jul 28, 2025
829c69f
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Jul 28, 2025
fb05a2a
Checkstyle
joshua-adams-1 Jul 28, 2025
962b3b2
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Jul 28, 2025
4b97f07
Adds Node Joining Integration Tests
joshua-adams-1 Aug 7, 2025
2a541b0
Adds nodeExistsWithName method
joshua-adams-1 Aug 7, 2025
eb572d4
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 7, 2025
3e501b4
Minor tweaks
joshua-adams-1 Aug 7, 2025
b6ddfba
Uncomments out solution, and makes NodeJoiningIT
joshua-adams-1 Aug 11, 2025
dac102a
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 11, 2025
d269e47
Ran ./gradlew spotlessApply precommit
joshua-adams-1 Aug 11, 2025
a668ab1
Modify Coordinator Logic
joshua-adams-1 Aug 12, 2025
17e6bd6
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 14, 2025
cc67155
Generalise logging expectations
joshua-adams-1 Aug 14, 2025
d47915b
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 14, 2025
f7cbcbd
[CI] Auto commit changes from spotless
Aug 14, 2025
c989fd0
Add YAML test for "missing lookup key" scenario (#132870)
smalyshev Aug 14, 2025
c1b1ef2
Add memory accounting to exponential histogram library. (#132580)
JonasKunz Aug 14, 2025
6708bb4
Vectorize BQVectorUtils#packAsBinary (#132923)
iverase Aug 14, 2025
4150643
Mute org.elasticsearch.xpack.esql.qa.mixed.EsqlClientYamlIT test {p0=…
elasticsearchmachine Aug 14, 2025
170a518
Remove mutes for resolved CsvTests issues (#132924)
idegtiarenko Aug 14, 2025
2ee0871
ESQL - Allow null values in vector similarity functions (#132919)
carlosdelest Aug 14, 2025
5eec40f
Vectorize BQSpaceUtils#transposeHalfByte (#132935)
iverase Aug 14, 2025
c27fe6a
Send max of two types of max queue latency to ClusterInfo (#132675)
DiannaHohensee Aug 14, 2025
15d6693
[DiskBBQ] Replace n_probe, related to the number of centroids with v…
iverase Aug 14, 2025
6c3fadc
[ML] Add spec files for Llama and AI21 (#132724)
jonathan-buttner Aug 14, 2025
f45e1ad
Remove awaits for closed issues (#132306)
smalyshev Aug 14, 2025
b238367
Suppport per-project behavior in ESQL extra verifiers (#131884)
mark-vieira Aug 14, 2025
36c9f02
Add random tests with match_only_text multi-field (#132380)
parkertimmins Aug 14, 2025
ba8c5e6
Store ignored source in unique stored fields per entry (#132142)
jordan-powers Aug 14, 2025
4a8f3f7
Rename skipping logic to remove hard link to skip_unavailable (#132861)
smalyshev Aug 14, 2025
cdfdb5e
Mute org.elasticsearch.index.mapper.LongFieldMapperTests testFetchMan…
elasticsearchmachine Aug 14, 2025
a4ac7fb
Adding simulate ingest effective mapping (#132833)
masseyke Aug 14, 2025
cbbacc0
Precompute the BitsetCacheKey hashCode (#132875)
joegallo Aug 14, 2025
c0a079f
Fix failing UT by adding a required capability (#132947)
julian-elastic Aug 14, 2025
3337cf2
Mute org.elasticsearch.index.mapper.LongFieldMapperTests testFetch #1…
elasticsearchmachine Aug 14, 2025
57e5887
Remove CrossClusterCancellationIT.createLocalIndex() (#132952)
JeremyDahlgren Aug 14, 2025
25333ca
Unmuting simulate index data stream mapping overrides yaml rest test …
masseyke Aug 14, 2025
433b827
Mute org.elasticsearch.cluster.ClusterInfoServiceIT testMaxQueueLaten…
elasticsearchmachine Aug 14, 2025
9203679
Introduce execution location marker for better handling of remote/loc…
smalyshev Aug 14, 2025
e6b86ef
Implement v_magnitude function (#132765)
svilen-mihaylov-elastic Aug 14, 2025
a180eaf
Breakdown undesired allocations by shard routing role (#132235)
nicktindall Aug 14, 2025
57db61c
Switch to PR-based benchmark pipeline defined in ES repo (#132941)
gbanasiak Aug 15, 2025
af0c58e
Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {…
elasticsearchmachine Aug 15, 2025
b7922ff
Implement WriteLoadConstraintDecider#canAllocate (#132041)
DiannaHohensee Aug 15, 2025
6485e97
Simplify EsqlSession (#132848)
idegtiarenko Aug 15, 2025
1d35bd3
Mute org.elasticsearch.index.mapper.LongFieldMapperTests testSyntheti…
elasticsearchmachine Aug 15, 2025
332a86c
Speed up loading keyword fields with index sorts (#132950)
dnhatn Aug 15, 2025
98be4ad
Merge remote-tracking branch 'upstream/main' into master-node-disconnect
joshua-adams-1 Aug 15, 2025
0e227f7
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 15, 2025
00b27f3
Remove logger.info
joshua-adams-1 Aug 15, 2025
8c4de89
David Turner Comments
joshua-adams-1 Aug 19, 2025
d5141bc
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 19, 2025
ed73225
Fix unit tests
joshua-adams-1 Aug 20, 2025
a7b2f0e
Merge branch 'master-node-disconnect' of github.com:joshua-adams-1/el…
joshua-adams-1 Aug 20, 2025
ad45d19
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 20, 2025
0b4560f
Comments
joshua-adams-1 Aug 22, 2025
aa82afe
Unused method
joshua-adams-1 Aug 22, 2025
8323493
Merge branch 'master-node-disconnect' of github.com:joshua-adams-1/el…
joshua-adams-1 Aug 22, 2025
ff5b4ec
[CI] Auto commit changes from spotless
Aug 22, 2025
8ad70a2
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 22, 2025
8316b56
Changes
joshua-adams-1 Aug 22, 2025
8bf17e5
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 26, 2025
3f46e13
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Aug 27, 2025
75b118a
David Comments
joshua-adams-1 Sep 1, 2025
c9f608f
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Sep 1, 2025
2f493f5
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Sep 2, 2025
dc7eb96
Merge branch 'master-node-disconnect' of github.com:joshua-adams-1/el…
joshua-adams-1 Sep 2, 2025
3683676
David Comments
joshua-adams-1 Sep 2, 2025
cd8502a
Merge branch 'main' into master-node-disconnect
joshua-adams-1 Sep 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@

import org.apache.logging.log4j.Level;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -85,18 +83,8 @@ public void testNodeTriesToJoinClusterAndThenDifferentMasterIsElected() {
ensureStableCluster(3);
String originalMasterNodeName = internalCluster().getMasterName();
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(originalMasterNodeName).state().getNodes().size();
int numberOfMasterNodesOriginallyInCluster = internalCluster().clusterService(originalMasterNodeName)
.state()
.nodes()
.getMasterNodes()
.size();
int numberOfDataNodesOriginallyInCluster = internalCluster().clusterService(originalMasterNodeName)
.state()
.nodes()
.getDataNodes()
.size();
// Determine upfront who we want the next master to be
final var newMasterNodeName = nodeNames.getFirst();
final var newMasterNodeName = randomValueOtherThan(originalMasterNodeName, () -> randomFrom(nodeNames));

// Ensure the logging is as expected
try (var mockLog = MockLog.capture(NodeJoinExecutor.class)) {
Expand Down Expand Up @@ -128,14 +116,15 @@ public void testNodeTriesToJoinClusterAndThenDifferentMasterIsElected() {
assertNotEquals(originalMasterNodeName, internalCluster().getMasterName());
logger.info("New master is elected");

// Assert all nodes have accepted N into their cluster state
assertNewNodeIsInAllClusterStates(newNodeName);

mockLog.assertAllExpectationsMatched();

// Assert the new data node was added
DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes();
assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize());
assertEquals(numberOfDataNodesOriginallyInCluster + 1, discoveryNodes.getDataNodes().size());
assertTrue(getListOfDataNodeNamesFromCluster(newMasterNodeName).contains(newNodeName));
assertEquals(numberOfMasterNodesOriginallyInCluster, discoveryNodes.getMasterNodes().size());
}
}

Expand All @@ -149,20 +138,10 @@ public void testNodeTriesToJoinClusterAndThenSameMasterIsElected() {
ensureStableCluster(3);
String masterNodeName = internalCluster().getMasterName();

long originalTerm = internalCluster().clusterService(masterNodeName).state().coordinationMetadata().term();
long originalTerm = getTerm(masterNodeName);
int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size();
int numberOfMasterNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName)
.state()
.nodes()
.getMasterNodes()
.size();
int numberOfDataNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().nodes().getDataNodes().size();
String[] namesOfAllNodesInOriginalCluster = internalCluster().getNodeNames();

// Ensure the logging is as expected
try (var mockLog = MockLog.capture(NodeJoinExecutor.class, MasterService.class, ClusterApplierService.class)) {
SubscribableListener<Void> publishingBanRemovedListener = null;

for (String nodeName : internalCluster().getNodeNames()) {
final var mockTransportService = MockTransportService.getInstance(nodeName);

Expand All @@ -175,18 +154,16 @@ public void testNodeTriesToJoinClusterAndThenSameMasterIsElected() {

// Wait until the master has stepped down before removing the publishing ban
// This allows the master to be re-elected
publishingBanRemovedListener = ClusterServiceUtils.addTemporaryStateListener(
internalCluster().clusterService(masterNodeName),
clusterState -> {
DiscoveryNode currentMasterNode = clusterState.nodes().getMasterNode();
boolean hasMasterSteppedDown = currentMasterNode == null
|| currentMasterNode.getName().equals(masterNodeName) == false;
if (hasMasterSteppedDown) {
mockTransportService.addSendBehavior(Transport.Connection::sendRequest);
}
return hasMasterSteppedDown;
ClusterServiceUtils.addTemporaryStateListener(internalCluster().clusterService(masterNodeName), clusterState -> {
DiscoveryNode currentMasterNode = clusterState.nodes().getMasterNode();
boolean hasMasterSteppedDown = currentMasterNode == null
|| currentMasterNode.getName().equals(masterNodeName) == false;
if (hasMasterSteppedDown) {
logger.info("Master publishing ban removed");
mockTransportService.addSendBehavior(Transport.Connection::sendRequest);
}
);
return hasMasterSteppedDown;
});

} else {
// This disables pre-voting on all nodes except the master, forcing it to win the election
Expand Down Expand Up @@ -220,45 +197,29 @@ Therefore, this WARN log should not be thrown (#ES-11449)
logger.info("Sending node join request");
String newNodeName = internalCluster().startDataOnlyNode();

safeAwait(publishingBanRemovedListener);
logger.info("Master publishing ban removed");
// Assert the master was re-elected
assertTrue(masterNodeName.equals(internalCluster().getMasterName()) && originalTerm < getTerm(masterNodeName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: slight preference for this being two assertThat calls: if this fails, we get no feedback about why, whereas if we use assertThat twice then we'll be able to see whether it was the master that changed (and it'll identify the new master) or whether the term didn't increase.


// Wait until the master acknowledges its re-election. The master is only re-elected once it's publishing ban is lifted
SubscribableListener<Void> masterKnowsItsReElectedListener = ClusterServiceUtils.addTemporaryStateListener(
internalCluster().getInstance(ClusterService.class, masterNodeName),
clusterState -> {
DiscoveryNode currentMasterNode = clusterState.nodes().getMasterNode();
long currentTerm = clusterState.coordinationMetadata().term();
return currentMasterNode != null && currentMasterNode.getName().equals(masterNodeName) && currentTerm > originalTerm;
}
);
safeAwait(masterKnowsItsReElectedListener);

assertEquals(masterNodeName, internalCluster().getMasterName());
logger.info("Master has been re-elected");

try {
// Await for N to be in the cluster state of all nodes
for (String nodeName : namesOfAllNodesInOriginalCluster) {
ClusterServiceUtils.awaitClusterState(
logger,
clusterState -> nodeExistsWithName(clusterState.nodes(), newNodeName),
internalCluster().clusterService(nodeName)
);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
// Assert all nodes have accepted N into their cluster state
assertNewNodeIsInAllClusterStates(newNodeName);

// If the WARN log was thrown, then the connection to N was disconnected so fail the test
mockLog.assertAllExpectationsMatched();

// Assert the new data node was added
DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes();
assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize());
assertEquals(numberOfDataNodesOriginallyInCluster + 1, discoveryNodes.getDataNodes().size());
assertTrue(getListOfDataNodeNamesFromCluster(masterNodeName).contains(newNodeName));
assertEquals(numberOfMasterNodesOriginallyInCluster, discoveryNodes.getMasterNodes().size());
}
}

private long getTerm(String masterNodeName) {
return internalCluster().clusterService(masterNodeName).state().coordinationMetadata().term();
}

private void assertNewNodeIsInAllClusterStates(String newNodeName) {
for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) {
assertTrue(clusterService.state().nodes().getAllNodes().stream().map(DiscoveryNode::getName).toList().contains(newNodeName));
}
}

Expand All @@ -273,25 +234,13 @@ private List<String> getListOfDataNodeNamesFromCluster(String nodeName) {
.toList();
}

private boolean nodeExistsWithName(DiscoveryNodes nodes, String nodeName) {
for (DiscoveryNode node : nodes.getAllNodes()) {
if (node.getName().equals(nodeName)) {
return true;
}
}
return false;
}

private void addJoiningNodeDisconnectedWarnLogFalseExpectation(MockLog mockLog) {
mockLog.addExpectation(
new MockLog.UnseenEventExpectation(
"warn message with troubleshooting link",
"org.elasticsearch.cluster.coordination.NodeJoinExecutor",
Level.WARN,
"node-join: [*] with reason ["
+ ReferenceDocs.UNSTABLE_CLUSTER_TROUBLESHOOTING
+ "]; for troubleshooting guidance, see "
+ "https://www.elastic.co/docs/troubleshoot/elasticsearch/troubleshooting-unstable-cluster*"
"*"
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,51 +668,57 @@ private void handleJoinRequest(JoinRequest joinRequest, ActionListener<Void> joi
transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() {
@Override
public void onResponse(Releasable response) {
validateJoinRequest(
joinRequest,
ActionListener.runBefore(joinListener, () -> Releasables.close(response))
.delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l.delegateResponse((ignoredListener, e) -> {

/*
This prevents a corner case, explained in #ES-11449, occurring as follows:
- Master M is in term T and has cluster state (T, V).
- Node N tries to join the cluster.
- M proposes cluster state (T, V+1) with N in the cluster.
- M accepts its own proposal and commits it to disk.
- M receives no responses. M doesn't know whether the state was accepted by a majority of nodes,
rejected, or did not reach any nodes.
- There is a re-election and M wins. M publishes cluster state (T+1, V+2).
Since it's built from the cluster state on disk, N is still in the cluster.
- Since (T, V+1) failed, a FailedToCommitClusterStateException is thrown and N's connection is dropped,
even though its inclusion in the cluster may have been committed on a majority of master nodes.
- It can rejoin, but this throws a WARN log since it did not restart.

The above situation occurs here when a FailedToCommitClusterStateException is thrown.
When we catch this exception, we keep the connection open until the next cluster state update.
N is accepted -> By waiting, we did not close the connection to N unnecessarily
N is rejected -> A new cluster state is published without N in it. Closing is correct here.
*/
if (e instanceof FailedToCommitClusterStateException) {
ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
// Keep the connection open until the next committed state
if (event.state().nodes().getMasterNode() != null) {
Releasables.close(response);
// Remove this listener to avoid memory leaks
clusterService.removeListener(this);
joinListener.onResponse(null);
SubscribableListener
// Validates the join request: can the remote node deserialize our cluster state and does it respond to pings?
.<Void>newForked(l -> validateJoinRequest(joinRequest, l))

// Adds the joining node to the cluster state
.<Void>andThen(l -> processJoinRequest(joinRequest, l.delegateResponse((ll, e) -> {
// #ES-11449
if (e instanceof FailedToCommitClusterStateException) {
// The commit failed (i.e. master is failing over) but this does not imply that the join has actually failed:
// the next master may have already accepted the state that we just published and will therefore include the
// joining node in its future states too. Thus we need to wait for the next committed state before we know the
// eventual outcome, and we need to wait for that before we can release (our ref to) the connection and complete
// the listener.

// NB we are on the master update thread here at the end of processing the failed cluster state update, so this
// all happens before any cluster state update that re-elects a master
assert ThreadPool.assertCurrentThreadPool(MasterService.MASTER_UPDATE_THREAD_NAME);

final ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
final var discoveryNodes = event.state().nodes();
// Keep the connection open until the next committed state
if (discoveryNodes.getMasterNode() != null) {
// Remove this listener to avoid memory leaks
clusterService.removeListener(this);

if (discoveryNodes.nodeExists(joinRequest.getSourceNode().getId())) {
ll.onResponse(null);
} else {
ll.onFailure(e);
}
}
};
}
};
clusterService.addListener(clusterStateListener);

clusterService.addListener(clusterStateListener);
} else {
Releasables.close(response);
joinListener.onFailure(e);
// Immediate condition check in case another node is elected master
if (clusterService.state().nodes().nodeExists(joinRequest.getSourceNode().getId())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to fail the join listener and remove the cluster-state listener if the last-applied state is committed (i.e. has a master node) and doesn't include the joining node.

Does it work to call clusterStateListener.clusterChanged(new ClusterChangedEvent("", clusterService.state(), clusterService.state()))?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work

Hmm no, not quite: that might complete ll twice which is generally something we should try and avoid. It turns out that as things are implemented today ll will be a SubscribableListener which has well-defined semantics when completed multiple times:

* If this listener is completed more than once then all results other than the first (whether successful or otherwise) are silently
* discarded. All subscribed listeners will be notified of the same result, exactly once, even if several completions occur concurrently.

However we can't rely on that being true in future, it's not guaranteed that ll will always be a SubscribableListener in this context. I'd prefer we explicitly deduplicated this work e.g. by creating another SubscribableListener.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However we can't rely on that being true in future

I contemplated making it not be true by adding a check that these listeners are not completed multiple times:

diff --git a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java
index 9eddbb55b776..b7c8c4a28279 100644
--- a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java
+++ b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java
@@ -135,7 +135,7 @@ public class SubscribableListener<T> implements ActionListener<T> {
      */
     public static <T> SubscribableListener<T> newForked(CheckedConsumer<ActionListener<T>, ? extends Exception> fork) {
         final var listener = new SubscribableListener<T>();
-        ActionListener.run(listener, fork::accept);
+        ActionListener.run(ActionListener.assertOnce(listener), fork::accept);
         return listener;
     }

However, on reflection this seems unnecessarily strict and indeed it causes the SubscribableListener test suite to fail because we actually already assert that newForked and andThen receive the returned listener instance, which is therefore safe to complete more than once. I think it's best to document this fact, see #133391, and then we can rely on it here too (so disregard my previous message)

// Remove this listener to avoid memory leaks
clusterService.removeListener(clusterStateListener);

ll.onResponse(null);
}
})))
);
} else {
ll.onFailure(e);
}
})))

// Whatever the outcome, release (our ref to) the connection we just opened and notify the joining node.
.addListener(ActionListener.runBefore(joinListener, () -> Releasables.close(response)));
}

@Override
Expand Down