|  | 
| 19 | 19 | import org.elasticsearch.action.support.SubscribableListener; | 
| 20 | 20 | import org.elasticsearch.action.support.ThreadedActionListener; | 
| 21 | 21 | import org.elasticsearch.client.internal.Client; | 
| 22 |  | -import org.elasticsearch.cluster.ClusterChangedEvent; | 
| 23 | 22 | import org.elasticsearch.cluster.ClusterName; | 
| 24 | 23 | import org.elasticsearch.cluster.ClusterState; | 
| 25 |  | -import org.elasticsearch.cluster.ClusterStateListener; | 
| 26 | 24 | import org.elasticsearch.cluster.ClusterStatePublicationEvent; | 
| 27 | 25 | import org.elasticsearch.cluster.ClusterStateUpdateTask; | 
| 28 | 26 | import org.elasticsearch.cluster.LocalMasterServiceTask; | 
|  | 
| 41 | 39 | import org.elasticsearch.cluster.routing.allocation.AllocationService; | 
| 42 | 40 | import org.elasticsearch.cluster.service.ClusterApplier; | 
| 43 | 41 | import org.elasticsearch.cluster.service.ClusterApplierService; | 
| 44 |  | -import org.elasticsearch.cluster.service.ClusterService; | 
| 45 | 42 | import org.elasticsearch.cluster.service.MasterService; | 
| 46 | 43 | import org.elasticsearch.cluster.service.MasterServiceTaskQueue; | 
| 47 | 44 | import org.elasticsearch.cluster.version.CompatibilityVersions; | 
| @@ -193,7 +190,6 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt | 
| 193 | 190 |     private final NodeHealthService nodeHealthService; | 
| 194 | 191 |     private final List<PeerFinderListener> peerFinderListeners; | 
| 195 | 192 |     private final LeaderHeartbeatService leaderHeartbeatService; | 
| 196 |  | -    private final ClusterService clusterService; | 
| 197 | 193 | 
 | 
| 198 | 194 |     /** | 
| 199 | 195 |      * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. | 
| @@ -222,8 +218,7 @@ public Coordinator( | 
| 222 | 218 |         LeaderHeartbeatService leaderHeartbeatService, | 
| 223 | 219 |         PreVoteCollector.Factory preVoteCollectorFactory, | 
| 224 | 220 |         CompatibilityVersions compatibilityVersions, | 
| 225 |  | -        FeatureService featureService, | 
| 226 |  | -        ClusterService clusterService | 
|  | 221 | +        FeatureService featureService | 
| 227 | 222 |     ) { | 
| 228 | 223 |         this.settings = settings; | 
| 229 | 224 |         this.transportService = transportService; | 
| @@ -333,7 +328,6 @@ public Coordinator( | 
| 333 | 328 |         this.peerFinderListeners.add(clusterBootstrapService); | 
| 334 | 329 |         this.leaderHeartbeatService = leaderHeartbeatService; | 
| 335 | 330 |         this.compatibilityVersions = compatibilityVersions; | 
| 336 |  | -        this.clusterService = clusterService; | 
| 337 | 331 |     } | 
| 338 | 332 | 
 | 
| 339 | 333 |     /** | 
| @@ -668,57 +662,11 @@ private void handleJoinRequest(JoinRequest joinRequest, ActionListener<Void> joi | 
| 668 | 662 |         transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() { | 
| 669 | 663 |             @Override | 
| 670 | 664 |             public void onResponse(Releasable response) { | 
| 671 |  | -                SubscribableListener | 
| 672 |  | -                    // Validates the join request: can the remote node deserialize our cluster state and does it respond to pings? | 
| 673 |  | -                    .<Void>newForked(l -> validateJoinRequest(joinRequest, l)) | 
| 674 |  | - | 
| 675 |  | -                    // Adds the joining node to the cluster state | 
| 676 |  | -                    .<Void>andThen(l -> processJoinRequest(joinRequest, l.delegateResponse((ll, e) -> { | 
| 677 |  | -                        // #ES-11449 | 
| 678 |  | -                        if (e instanceof FailedToCommitClusterStateException) { | 
| 679 |  | -                            // The commit failed (i.e. master is failing over) but this does not imply that the join has actually failed: | 
| 680 |  | -                            // the next master may have already accepted the state that we just published and will therefore include the | 
| 681 |  | -                            // joining node in its future states too. Thus we need to wait for the next committed state before we know the | 
| 682 |  | -                            // eventual outcome, and we need to wait for that before we can release (our ref to) the connection and complete | 
| 683 |  | -                            // the listener. | 
| 684 |  | - | 
| 685 |  | -                            // NB we are on the master update thread here at the end of processing the failed cluster state update, so this | 
| 686 |  | -                            // all happens before any cluster state update that re-elects a master | 
| 687 |  | -                            assert ThreadPool.assertCurrentThreadPool(MasterService.MASTER_UPDATE_THREAD_NAME); | 
| 688 |  | - | 
| 689 |  | -                            final ClusterStateListener clusterStateListener = new ClusterStateListener() { | 
| 690 |  | -                                @Override | 
| 691 |  | -                                public void clusterChanged(ClusterChangedEvent event) { | 
| 692 |  | -                                    final var discoveryNodes = event.state().nodes(); | 
| 693 |  | -                                    // Keep the connection open until the next committed state | 
| 694 |  | -                                    if (discoveryNodes.getMasterNode() != null) { | 
| 695 |  | -                                        // Remove this listener to avoid memory leaks | 
| 696 |  | -                                        clusterService.removeListener(this); | 
| 697 |  | -                                        if (discoveryNodes.nodeExists(joinRequest.getSourceNode().getId())) { | 
| 698 |  | -                                            ll.onResponse(null); | 
| 699 |  | -                                        } else { | 
| 700 |  | -                                            ll.onFailure(e); | 
| 701 |  | -                                        } | 
| 702 |  | -                                    } | 
| 703 |  | -                                } | 
| 704 |  | -                            }; | 
| 705 |  | -                            clusterService.addListener(clusterStateListener); | 
| 706 |  | -                            clusterStateListener.clusterChanged( | 
| 707 |  | -                                new ClusterChangedEvent( | 
| 708 |  | -                                    "Checking if another master has been elected since " | 
| 709 |  | -                                        + joinRequest.getSourceNode().getName() | 
| 710 |  | -                                        + " attempted to join cluster", | 
| 711 |  | -                                    clusterService.state(), | 
| 712 |  | -                                    clusterService.state() | 
| 713 |  | -                                ) | 
| 714 |  | -                            ); | 
| 715 |  | -                        } else { | 
| 716 |  | -                            ll.onFailure(e); | 
| 717 |  | -                        } | 
| 718 |  | -                    }))) | 
| 719 |  | - | 
| 720 |  | -                    // Whatever the outcome, release (our ref to) the connection we just opened and notify the joining node. | 
| 721 |  | -                    .addListener(ActionListener.runBefore(joinListener, () -> Releasables.close(response))); | 
|  | 665 | +                validateJoinRequest( | 
|  | 666 | +                    joinRequest, | 
|  | 667 | +                    ActionListener.runBefore(joinListener, () -> Releasables.close(response)) | 
|  | 668 | +                        .delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l)) | 
|  | 669 | +                ); | 
| 722 | 670 |             } | 
| 723 | 671 | 
 | 
| 724 | 672 |             @Override | 
|  | 
0 commit comments