|
19 | 19 | import org.elasticsearch.action.support.SubscribableListener; |
20 | 20 | import org.elasticsearch.action.support.ThreadedActionListener; |
21 | 21 | import org.elasticsearch.client.internal.Client; |
22 | | -import org.elasticsearch.cluster.ClusterChangedEvent; |
23 | 22 | import org.elasticsearch.cluster.ClusterName; |
24 | 23 | import org.elasticsearch.cluster.ClusterState; |
25 | | -import org.elasticsearch.cluster.ClusterStateListener; |
26 | 24 | import org.elasticsearch.cluster.ClusterStatePublicationEvent; |
27 | 25 | import org.elasticsearch.cluster.ClusterStateUpdateTask; |
28 | 26 | import org.elasticsearch.cluster.LocalMasterServiceTask; |
|
41 | 39 | import org.elasticsearch.cluster.routing.allocation.AllocationService; |
42 | 40 | import org.elasticsearch.cluster.service.ClusterApplier; |
43 | 41 | import org.elasticsearch.cluster.service.ClusterApplierService; |
44 | | -import org.elasticsearch.cluster.service.ClusterService; |
45 | 42 | import org.elasticsearch.cluster.service.MasterService; |
46 | 43 | import org.elasticsearch.cluster.service.MasterServiceTaskQueue; |
47 | 44 | import org.elasticsearch.cluster.version.CompatibilityVersions; |
@@ -193,7 +190,6 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt |
193 | 190 | private final NodeHealthService nodeHealthService; |
194 | 191 | private final List<PeerFinderListener> peerFinderListeners; |
195 | 192 | private final LeaderHeartbeatService leaderHeartbeatService; |
196 | | - private final ClusterService clusterService; |
197 | 193 |
|
198 | 194 | /** |
199 | 195 | * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. |
@@ -222,8 +218,7 @@ public Coordinator( |
222 | 218 | LeaderHeartbeatService leaderHeartbeatService, |
223 | 219 | PreVoteCollector.Factory preVoteCollectorFactory, |
224 | 220 | CompatibilityVersions compatibilityVersions, |
225 | | - FeatureService featureService, |
226 | | - ClusterService clusterService |
| 221 | + FeatureService featureService |
227 | 222 | ) { |
228 | 223 | this.settings = settings; |
229 | 224 | this.transportService = transportService; |
@@ -333,7 +328,6 @@ public Coordinator( |
333 | 328 | this.peerFinderListeners.add(clusterBootstrapService); |
334 | 329 | this.leaderHeartbeatService = leaderHeartbeatService; |
335 | 330 | this.compatibilityVersions = compatibilityVersions; |
336 | | - this.clusterService = clusterService; |
337 | 331 | } |
338 | 332 |
|
339 | 333 | /** |
@@ -668,57 +662,11 @@ private void handleJoinRequest(JoinRequest joinRequest, ActionListener<Void> joi |
668 | 662 | transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() { |
669 | 663 | @Override |
670 | 664 | public void onResponse(Releasable response) { |
671 | | - SubscribableListener |
672 | | - // Validates the join request: can the remote node deserialize our cluster state and does it respond to pings? |
673 | | - .<Void>newForked(l -> validateJoinRequest(joinRequest, l)) |
674 | | - |
675 | | - // Adds the joining node to the cluster state |
676 | | - .<Void>andThen(l -> processJoinRequest(joinRequest, l.delegateResponse((ll, e) -> { |
677 | | - // #ES-11449 |
678 | | - if (e instanceof FailedToCommitClusterStateException) { |
679 | | - // The commit failed (i.e. master is failing over) but this does not imply that the join has actually failed: |
680 | | - // the next master may have already accepted the state that we just published and will therefore include the |
681 | | - // joining node in its future states too. Thus we need to wait for the next committed state before we know the |
682 | | - // eventual outcome, and we need to wait for that before we can release (our ref to) the connection and complete |
683 | | - // the listener. |
684 | | - |
685 | | - // NB we are on the master update thread here at the end of processing the failed cluster state update, so this |
686 | | - // all happens before any cluster state update that re-elects a master |
687 | | - assert ThreadPool.assertCurrentThreadPool(MasterService.MASTER_UPDATE_THREAD_NAME); |
688 | | - |
689 | | - final ClusterStateListener clusterStateListener = new ClusterStateListener() { |
690 | | - @Override |
691 | | - public void clusterChanged(ClusterChangedEvent event) { |
692 | | - final var discoveryNodes = event.state().nodes(); |
693 | | - // Keep the connection open until the next committed state |
694 | | - if (discoveryNodes.getMasterNode() != null) { |
695 | | - // Remove this listener to avoid memory leaks |
696 | | - clusterService.removeListener(this); |
697 | | - if (discoveryNodes.nodeExists(joinRequest.getSourceNode().getId())) { |
698 | | - ll.onResponse(null); |
699 | | - } else { |
700 | | - ll.onFailure(e); |
701 | | - } |
702 | | - } |
703 | | - } |
704 | | - }; |
705 | | - clusterService.addListener(clusterStateListener); |
706 | | - clusterStateListener.clusterChanged( |
707 | | - new ClusterChangedEvent( |
708 | | - "Checking if another master has been elected since " |
709 | | - + joinRequest.getSourceNode().getName() |
710 | | - + " attempted to join cluster", |
711 | | - clusterService.state(), |
712 | | - clusterService.state() |
713 | | - ) |
714 | | - ); |
715 | | - } else { |
716 | | - ll.onFailure(e); |
717 | | - } |
718 | | - }))) |
719 | | - |
720 | | - // Whatever the outcome, release (our ref to) the connection we just opened and notify the joining node. |
721 | | - .addListener(ActionListener.runBefore(joinListener, () -> Releasables.close(response))); |
| 665 | + validateJoinRequest( |
| 666 | + joinRequest, |
| 667 | + ActionListener.runBefore(joinListener, () -> Releasables.close(response)) |
| 668 | + .delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l)) |
| 669 | + ); |
722 | 670 | } |
723 | 671 |
|
724 | 672 | @Override |
|
0 commit comments