Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStatePublicationEvent;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.LocalMasterServiceTask;
Expand All @@ -41,7 +39,6 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.cluster.version.CompatibilityVersions;
Expand Down Expand Up @@ -193,7 +190,6 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
private final NodeHealthService nodeHealthService;
private final List<PeerFinderListener> peerFinderListeners;
private final LeaderHeartbeatService leaderHeartbeatService;
private final ClusterService clusterService;

/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
Expand Down Expand Up @@ -222,8 +218,7 @@ public Coordinator(
LeaderHeartbeatService leaderHeartbeatService,
PreVoteCollector.Factory preVoteCollectorFactory,
CompatibilityVersions compatibilityVersions,
FeatureService featureService,
ClusterService clusterService
FeatureService featureService
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -333,7 +328,6 @@ public Coordinator(
this.peerFinderListeners.add(clusterBootstrapService);
this.leaderHeartbeatService = leaderHeartbeatService;
this.compatibilityVersions = compatibilityVersions;
this.clusterService = clusterService;
}

/**
Expand Down Expand Up @@ -668,57 +662,11 @@ private void handleJoinRequest(JoinRequest joinRequest, ActionListener<Void> joi
transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() {
@Override
public void onResponse(Releasable response) {
SubscribableListener
// Validates the join request: can the remote node deserialize our cluster state and does it respond to pings?
.<Void>newForked(l -> validateJoinRequest(joinRequest, l))

// Adds the joining node to the cluster state
.<Void>andThen(l -> processJoinRequest(joinRequest, l.delegateResponse((ll, e) -> {
// #ES-11449
if (e instanceof FailedToCommitClusterStateException) {
// The commit failed (i.e. master is failing over) but this does not imply that the join has actually failed:
// the next master may have already accepted the state that we just published and will therefore include the
// joining node in its future states too. Thus we need to wait for the next committed state before we know the
// eventual outcome, and we need to wait for that before we can release (our ref to) the connection and complete
// the listener.

// NB we are on the master update thread here at the end of processing the failed cluster state update, so this
// all happens before any cluster state update that re-elects a master
assert ThreadPool.assertCurrentThreadPool(MasterService.MASTER_UPDATE_THREAD_NAME);

final ClusterStateListener clusterStateListener = new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
final var discoveryNodes = event.state().nodes();
// Keep the connection open until the next committed state
if (discoveryNodes.getMasterNode() != null) {
// Remove this listener to avoid memory leaks
clusterService.removeListener(this);
if (discoveryNodes.nodeExists(joinRequest.getSourceNode().getId())) {
ll.onResponse(null);
} else {
ll.onFailure(e);
}
}
}
};
clusterService.addListener(clusterStateListener);
clusterStateListener.clusterChanged(
new ClusterChangedEvent(
"Checking if another master has been elected since "
+ joinRequest.getSourceNode().getName()
+ " attempted to join cluster",
clusterService.state(),
clusterService.state()
)
);
} else {
ll.onFailure(e);
}
})))

// Whatever the outcome, release (our ref to) the connection we just opened and notify the joining node.
.addListener(ActionListener.runBefore(joinListener, () -> Releasables.close(response)));
validateJoinRequest(
joinRequest,
ActionListener.runBefore(joinListener, () -> Releasables.close(response))
.delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l))
);
}

@Override
Expand Down
Loading