| 
19 | 19 | import org.elasticsearch.action.support.SubscribableListener;  | 
20 | 20 | import org.elasticsearch.action.support.ThreadedActionListener;  | 
21 | 21 | import org.elasticsearch.client.internal.Client;  | 
22 |  | -import org.elasticsearch.cluster.ClusterChangedEvent;  | 
23 | 22 | import org.elasticsearch.cluster.ClusterName;  | 
24 | 23 | import org.elasticsearch.cluster.ClusterState;  | 
25 |  | -import org.elasticsearch.cluster.ClusterStateListener;  | 
26 | 24 | import org.elasticsearch.cluster.ClusterStatePublicationEvent;  | 
27 | 25 | import org.elasticsearch.cluster.ClusterStateUpdateTask;  | 
28 | 26 | import org.elasticsearch.cluster.LocalMasterServiceTask;  | 
 | 
41 | 39 | import org.elasticsearch.cluster.routing.allocation.AllocationService;  | 
42 | 40 | import org.elasticsearch.cluster.service.ClusterApplier;  | 
43 | 41 | import org.elasticsearch.cluster.service.ClusterApplierService;  | 
44 |  | -import org.elasticsearch.cluster.service.ClusterService;  | 
45 | 42 | import org.elasticsearch.cluster.service.MasterService;  | 
46 | 43 | import org.elasticsearch.cluster.service.MasterServiceTaskQueue;  | 
47 | 44 | import org.elasticsearch.cluster.version.CompatibilityVersions;  | 
@@ -193,7 +190,6 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt  | 
193 | 190 |     private final NodeHealthService nodeHealthService;  | 
194 | 191 |     private final List<PeerFinderListener> peerFinderListeners;  | 
195 | 192 |     private final LeaderHeartbeatService leaderHeartbeatService;  | 
196 |  | -    private final ClusterService clusterService;  | 
197 | 193 | 
 
  | 
198 | 194 |     /**  | 
199 | 195 |      * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.  | 
@@ -222,8 +218,7 @@ public Coordinator(  | 
222 | 218 |         LeaderHeartbeatService leaderHeartbeatService,  | 
223 | 219 |         PreVoteCollector.Factory preVoteCollectorFactory,  | 
224 | 220 |         CompatibilityVersions compatibilityVersions,  | 
225 |  | -        FeatureService featureService,  | 
226 |  | -        ClusterService clusterService  | 
 | 221 | +        FeatureService featureService  | 
227 | 222 |     ) {  | 
228 | 223 |         this.settings = settings;  | 
229 | 224 |         this.transportService = transportService;  | 
@@ -333,7 +328,6 @@ public Coordinator(  | 
333 | 328 |         this.peerFinderListeners.add(clusterBootstrapService);  | 
334 | 329 |         this.leaderHeartbeatService = leaderHeartbeatService;  | 
335 | 330 |         this.compatibilityVersions = compatibilityVersions;  | 
336 |  | -        this.clusterService = clusterService;  | 
337 | 331 |     }  | 
338 | 332 | 
 
  | 
339 | 333 |     /**  | 
@@ -668,57 +662,11 @@ private void handleJoinRequest(JoinRequest joinRequest, ActionListener<Void> joi  | 
668 | 662 |         transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() {  | 
669 | 663 |             @Override  | 
670 | 664 |             public void onResponse(Releasable response) {  | 
671 |  | -                SubscribableListener  | 
672 |  | -                    // Validates the join request: can the remote node deserialize our cluster state and does it respond to pings?  | 
673 |  | -                    .<Void>newForked(l -> validateJoinRequest(joinRequest, l))  | 
674 |  | - | 
675 |  | -                    // Adds the joining node to the cluster state  | 
676 |  | -                    .<Void>andThen(l -> processJoinRequest(joinRequest, l.delegateResponse((ll, e) -> {  | 
677 |  | -                        // #ES-11449  | 
678 |  | -                        if (e instanceof FailedToCommitClusterStateException) {  | 
679 |  | -                            // The commit failed (i.e. master is failing over) but this does not imply that the join has actually failed:  | 
680 |  | -                            // the next master may have already accepted the state that we just published and will therefore include the  | 
681 |  | -                            // joining node in its future states too. Thus we need to wait for the next committed state before we know the  | 
682 |  | -                            // eventual outcome, and we need to wait for that before we can release (our ref to) the connection and complete  | 
683 |  | -                            // the listener.  | 
684 |  | - | 
685 |  | -                            // NB we are on the master update thread here at the end of processing the failed cluster state update, so this  | 
686 |  | -                            // all happens before any cluster state update that re-elects a master  | 
687 |  | -                            assert ThreadPool.assertCurrentThreadPool(MasterService.MASTER_UPDATE_THREAD_NAME);  | 
688 |  | - | 
689 |  | -                            final ClusterStateListener clusterStateListener = new ClusterStateListener() {  | 
690 |  | -                                @Override  | 
691 |  | -                                public void clusterChanged(ClusterChangedEvent event) {  | 
692 |  | -                                    final var discoveryNodes = event.state().nodes();  | 
693 |  | -                                    // Keep the connection open until the next committed state  | 
694 |  | -                                    if (discoveryNodes.getMasterNode() != null) {  | 
695 |  | -                                        // Remove this listener to avoid memory leaks  | 
696 |  | -                                        clusterService.removeListener(this);  | 
697 |  | -                                        if (discoveryNodes.nodeExists(joinRequest.getSourceNode().getId())) {  | 
698 |  | -                                            ll.onResponse(null);  | 
699 |  | -                                        } else {  | 
700 |  | -                                            ll.onFailure(e);  | 
701 |  | -                                        }  | 
702 |  | -                                    }  | 
703 |  | -                                }  | 
704 |  | -                            };  | 
705 |  | -                            clusterService.addListener(clusterStateListener);  | 
706 |  | -                            clusterStateListener.clusterChanged(  | 
707 |  | -                                new ClusterChangedEvent(  | 
708 |  | -                                    "Checking if another master has been elected since "  | 
709 |  | -                                        + joinRequest.getSourceNode().getName()  | 
710 |  | -                                        + " attempted to join cluster",  | 
711 |  | -                                    clusterService.state(),  | 
712 |  | -                                    clusterService.state()  | 
713 |  | -                                )  | 
714 |  | -                            );  | 
715 |  | -                        } else {  | 
716 |  | -                            ll.onFailure(e);  | 
717 |  | -                        }  | 
718 |  | -                    })))  | 
719 |  | - | 
720 |  | -                    // Whatever the outcome, release (our ref to) the connection we just opened and notify the joining node.  | 
721 |  | -                    .addListener(ActionListener.runBefore(joinListener, () -> Releasables.close(response)));  | 
 | 665 | +                validateJoinRequest(  | 
 | 666 | +                    joinRequest,  | 
 | 667 | +                    ActionListener.runBefore(joinListener, () -> Releasables.close(response))  | 
 | 668 | +                        .delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l))  | 
 | 669 | +                );  | 
722 | 670 |             }  | 
723 | 671 | 
 
  | 
724 | 672 |             @Override  | 
 | 
0 commit comments