-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Master node disconnect #132023
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Master node disconnect #132023
Changes from 8 commits
21d7b30
829c69f
fb05a2a
962b3b2
4b97f07
2a541b0
eb572d4
3e501b4
b6ddfba
dac102a
d269e47
a668ab1
17e6bd6
cc67155
d47915b
f7cbcbd
c989fd0
c1b1ef2
6708bb4
4150643
170a518
2ee0871
5eec40f
c27fe6a
15d6693
6c3fadc
f45e1ad
b238367
36c9f02
ba8c5e6
4a8f3f7
cdfdb5e
a4ac7fb
cbbacc0
c0a079f
3337cf2
57e5887
25333ca
433b827
9203679
e6b86ef
a180eaf
57db61c
af0c58e
b7922ff
6485e97
1d35bd3
332a86c
98be4ad
0e227f7
00b27f3
8c4de89
d5141bc
ed73225
a7b2f0e
ad45d19
0b4560f
aa82afe
8323493
ff5b4ec
8ad70a2
8316b56
8bf17e5
3f46e13
75b118a
c9f608f
2f493f5
dc7eb96
3683676
cd8502a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -668,51 +668,57 @@ private void handleJoinRequest(JoinRequest joinRequest, ActionListener<Void> joi | |
transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() { | ||
@Override | ||
public void onResponse(Releasable response) { | ||
validateJoinRequest( | ||
joinRequest, | ||
ActionListener.runBefore(joinListener, () -> Releasables.close(response)) | ||
.delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l.delegateResponse((ignoredListener, e) -> { | ||
|
||
/* | ||
This prevents a corner case, explained in #ES-11449, occurring as follows: | ||
- Master M is in term T and has cluster state (T, V). | ||
- Node N tries to join the cluster. | ||
- M proposes cluster state (T, V+1) with N in the cluster. | ||
- M accepts its own proposal and commits it to disk. | ||
- M receives no responses. M doesn't know whether the state was accepted by a majority of nodes, | ||
rejected, or did not reach any nodes. | ||
- There is a re-election and M wins. M publishes cluster state (T+1, V+2). | ||
Since it's built from the cluster state on disk, N is still in the cluster. | ||
- Since (T, V+1) failed, a FailedToCommitClusterStateException is thrown and N's connection is dropped, | ||
even though its inclusion in the cluster may have been committed on a majority of master nodes. | ||
- It can rejoin, but this throws a WARN log since it did not restart. | ||
|
||
The above situation occurs here when a FailedToCommitClusterStateException is thrown. | ||
When we catch this exception, we keep the connection open until the next cluster state update. | ||
N is accepted -> By waiting, we did not close the connection to N unnecessarily | ||
N is rejected -> A new cluster state is published without N in it. Closing is correct here. | ||
*/ | ||
if (e instanceof FailedToCommitClusterStateException) { | ||
ClusterStateListener clusterStateListener = new ClusterStateListener() { | ||
@Override | ||
public void clusterChanged(ClusterChangedEvent event) { | ||
// Keep the connection open until the next committed state | ||
if (event.state().nodes().getMasterNode() != null) { | ||
Releasables.close(response); | ||
// Remove this listener to avoid memory leaks | ||
clusterService.removeListener(this); | ||
joinListener.onResponse(null); | ||
SubscribableListener | ||
// Validates the join request: can the remote node deserialize our cluster state and does it respond to pings? | ||
.<Void>newForked(l -> validateJoinRequest(joinRequest, l)) | ||
|
||
// Adds the joining node to the cluster state | ||
.<Void>andThen(l -> processJoinRequest(joinRequest, l.delegateResponse((ll, e) -> { | ||
// #ES-11449 | ||
if (e instanceof FailedToCommitClusterStateException) { | ||
DaveCTurner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// The commit failed (i.e. master is failing over) but this does not imply that the join has actually failed: | ||
// the next master may have already accepted the state that we just published and will therefore include the | ||
// joining node in its future states too. Thus we need to wait for the next committed state before we know the | ||
// eventual outcome, and we need to wait for that before we can release (our ref to) the connection and complete | ||
// the listener. | ||
|
||
// NB we are on the master update thread here at the end of processing the failed cluster state update, so this | ||
// all happens before any cluster state update that re-elects a master | ||
assert ThreadPool.assertCurrentThreadPool(MasterService.MASTER_UPDATE_THREAD_NAME); | ||
|
||
final ClusterStateListener clusterStateListener = new ClusterStateListener() { | ||
@Override | ||
public void clusterChanged(ClusterChangedEvent event) { | ||
final var discoveryNodes = event.state().nodes(); | ||
// Keep the connection open until the next committed state | ||
if (discoveryNodes.getMasterNode() != null) { | ||
// Remove this listener to avoid memory leaks | ||
clusterService.removeListener(this); | ||
|
||
if (discoveryNodes.nodeExists(joinRequest.getSourceNode().getId())) { | ||
ll.onResponse(null); | ||
} else { | ||
ll.onFailure(e); | ||
} | ||
} | ||
}; | ||
|
||
clusterService.addListener(clusterStateListener); | ||
} else { | ||
Releasables.close(response); | ||
joinListener.onFailure(e); | ||
} | ||
}; | ||
clusterService.addListener(clusterStateListener); | ||
|
||
// Another node was elected, and doesn't have the node in it | ||
if (clusterService.state().nodes().getMasterNode() != null | ||
&& clusterService.state().nodes().nodeExists(joinRequest.getSourceNode().getId()) == false) { | ||
// Remove this listener to avoid memory leaks | ||
clusterService.removeListener(clusterStateListener); | ||
ll.onFailure(e); | ||
} | ||
|
||
}))) | ||
); | ||
} else { | ||
ll.onFailure(e); | ||
} | ||
}))) | ||
|
||
// Whatever the outcome, release (our ref to) the connection we just opened and notify the joining node. | ||
.addListener(ActionListener.runBefore(joinListener, () -> Releasables.close(response))); | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: slight preference for this being two
assertThat
calls: if this fails, we get no feedback about why, whereas if we useassertThat
twice then we'll be able to see whether it was the master that changed (and it'll identify the new master) or whether the term didn't increase.