52
52
import org .elasticsearch .common .settings .Settings ;
53
53
import org .elasticsearch .common .transport .TransportAddress ;
54
54
import org .elasticsearch .common .util .concurrent .EsExecutors ;
55
- import org .elasticsearch .common .util .concurrent .ListenableFuture ;
56
55
import org .elasticsearch .common .xcontent .ChunkedToXContent ;
57
56
import org .elasticsearch .common .xcontent .XContentHelper ;
58
57
import org .elasticsearch .core .FixForMultiProject ;
@@ -355,9 +354,9 @@ private void onLeaderFailure(Supplier<String> message, Exception e) {
355
354
assert lastKnownLeader .isPresent ();
356
355
if (logger .isDebugEnabled ()) {
357
356
// TODO this is a workaround for log4j's Supplier. We should remove this, once using ES logging api
358
- logger .info (() -> message . get () , e );
357
+ logger .info (message :: get , e );
359
358
} else {
360
- logger .info (() -> message . get () );
359
+ logger .info (message :: get );
361
360
}
362
361
}
363
362
becomeCandidate ("onLeaderFailure" );
@@ -514,7 +513,7 @@ private void closePrevotingRound() {
514
513
515
514
/**
516
515
* Updates {@link #maxTermSeen} if greater.
517
- *
516
+ * <p>
518
517
* Every time a new term is found, either from another node requesting election, or this node trying to run for election, always update
519
518
* the max term number. The max term may not reflect an actual election, but rather an election attempt by some node in the
520
519
* cluster.
@@ -524,7 +523,7 @@ private void updateMaxTermSeen(final long term) {
524
523
maxTermSeen = Math .max (maxTermSeen , term );
525
524
final long currentTerm = getCurrentTerm ();
526
525
if (mode == Mode .LEADER && maxTermSeen > currentTerm ) {
527
- // Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that
526
+ // Bump our term. However, if there is a publication in flight then doing so would cancel the publication, so don't do that
528
527
// since we check whether a term bump is needed at the end of the publication too.
529
528
if (publicationInProgress ()) {
530
529
logger .debug ("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump" , maxTermSeen , currentTerm );
@@ -794,8 +793,8 @@ private void processJoinRequest(JoinRequest joinRequest, ActionListener<Void> jo
794
793
synchronized (mutex ) {
795
794
updateMaxTermSeen (joinRequest .getTerm ());
796
795
797
- final CoordinationState coordState = coordinationState .get ();
798
- final boolean prevElectionWon = coordState .electionWon ()
796
+ final CoordinationState localCoordinationState = coordinationState .get ();
797
+ final boolean previousElectionWon = localCoordinationState .electionWon ()
799
798
&& optionalJoin .stream ().allMatch (j -> j .term () <= getCurrentTerm ());
800
799
801
800
optionalJoin .ifPresent (this ::handleJoin );
@@ -806,7 +805,7 @@ private void processJoinRequest(JoinRequest joinRequest, ActionListener<Void> jo
806
805
joinListener
807
806
);
808
807
809
- if (prevElectionWon == false && coordState .electionWon ()) {
808
+ if (previousElectionWon == false && localCoordinationState .electionWon ()) {
810
809
becomeLeader ();
811
810
}
812
811
}
@@ -1616,7 +1615,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
1616
1615
clusterStatePublicationEvent ,
1617
1616
publishRequest ,
1618
1617
publicationContext ,
1619
- new ListenableFuture <>(),
1618
+ new SubscribableListener <>(),
1620
1619
ackListener ,
1621
1620
publishListener
1622
1621
);
@@ -1653,7 +1652,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
1653
1652
private boolean assertPreviousStateConsistency (ClusterStatePublicationEvent clusterStatePublicationEvent ) {
1654
1653
if (clusterStatePublicationEvent .getOldState () != coordinationState .get ().getLastAcceptedState ()) {
1655
1654
// compare JSON representations
1656
- @ FixForMultiProject // this is just so the toXContent doesnt throw - we want the same contents, but dont care if it's MP or not
1655
+ @ FixForMultiProject // this is just so toXContent doesn't throw - we want the same contents, but don't care if it's MP or not
1657
1656
ToXContent .Params params = new ToXContent .MapParams (Map .of ("multi-project" , "true" ));
1658
1657
1659
1658
String oldState = Strings .toString (ChunkedToXContent .wrapAsToXContent (clusterStatePublicationEvent .getOldState ()), params );
@@ -1669,7 +1668,7 @@ private boolean assertPreviousStateConsistency(ClusterStatePublicationEvent clus
1669
1668
}
1670
1669
1671
1670
private <T > ActionListener <T > wrapWithMutex (ActionListener <T > listener ) {
1672
- return new ActionListener <T >() {
1671
+ return new ActionListener <>() {
1673
1672
@ Override
1674
1673
public void onResponse (T t ) {
1675
1674
synchronized (mutex ) {
@@ -1688,9 +1687,7 @@ public void onFailure(Exception e) {
1688
1687
1689
1688
private void cancelActivePublication (String reason ) {
1690
1689
assert Thread .holdsLock (mutex ) : "Coordinator mutex not held" ;
1691
- if (currentPublication .isPresent ()) {
1692
- currentPublication .get ().cancel (reason );
1693
- }
1690
+ currentPublication .ifPresent (coordinatorPublication -> coordinatorPublication .cancel (reason ));
1694
1691
}
1695
1692
1696
1693
public Collection <BiConsumer <DiscoveryNode , ClusterState >> getOnJoinValidators () {
@@ -1863,7 +1860,7 @@ class CoordinatorPublication extends Publication {
1863
1860
1864
1861
private final ClusterStatePublicationEvent clusterStatePublicationEvent ;
1865
1862
private final PublishRequest publishRequest ;
1866
- private final ListenableFuture <Void > localNodeAckEvent ;
1863
+ private final SubscribableListener <Void > localNodeAckEvent ;
1867
1864
private final AckListener ackListener ;
1868
1865
private final ActionListener <Void > publishListener ;
1869
1866
private final PublicationTransportHandler .PublicationContext publicationContext ;
@@ -1881,7 +1878,7 @@ class CoordinatorPublication extends Publication {
1881
1878
ClusterStatePublicationEvent clusterStatePublicationEvent ,
1882
1879
PublishRequest publishRequest ,
1883
1880
PublicationTransportHandler .PublicationContext publicationContext ,
1884
- ListenableFuture <Void > localNodeAckEvent ,
1881
+ SubscribableListener <Void > localNodeAckEvent ,
1885
1882
AckListener ackListener ,
1886
1883
ActionListener <Void > publishListener
1887
1884
) {
@@ -1981,81 +1978,77 @@ public void onResponse(Void ignore) {
1981
1978
assert receivedJoinsProcessed == false ;
1982
1979
receivedJoinsProcessed = true ;
1983
1980
1984
- clusterApplier .onNewClusterState (
1985
- CoordinatorPublication .this .toString (),
1986
- () -> applierState ,
1987
- new ActionListener <Void >() {
1988
- @ Override
1989
- public void onFailure (Exception e ) {
1990
- synchronized (mutex ) {
1991
- removePublicationAndPossiblyBecomeCandidate ("clusterApplier#onNewClusterState" );
1992
- }
1993
- cancelTimeoutHandlers ();
1994
- ackListener .onNodeAck (getLocalNode (), e );
1995
- publishListener .onFailure (e );
1981
+ clusterApplier .onNewClusterState (CoordinatorPublication .this .toString (), () -> applierState , new ActionListener <>() {
1982
+ @ Override
1983
+ public void onFailure (Exception e ) {
1984
+ synchronized (mutex ) {
1985
+ removePublicationAndPossiblyBecomeCandidate ("clusterApplier#onNewClusterState" );
1996
1986
}
1987
+ cancelTimeoutHandlers ();
1988
+ ackListener .onNodeAck (getLocalNode (), e );
1989
+ publishListener .onFailure (e );
1990
+ }
1997
1991
1998
- @ Override
1999
- public void onResponse (Void ignored ) {
2000
- onClusterStateApplied ();
2001
- clusterStatePublicationEvent .setMasterApplyElapsedMillis (
2002
- transportService .getThreadPool ().rawRelativeTimeInMillis () - completionTimeMillis
2003
- );
2004
- synchronized (mutex ) {
2005
- assert currentPublication .get () == CoordinatorPublication .this ;
2006
- currentPublication = Optional .empty ();
2007
- logger .debug ("publication ended successfully: {}" , CoordinatorPublication .this );
2008
- // trigger term bump if new term was found during publication
2009
- updateMaxTermSeen (getCurrentTerm ());
2010
-
2011
- if (mode == Mode .LEADER ) {
2012
- // if necessary, abdicate to another node or improve the voting configuration
2013
- boolean attemptReconfiguration = true ;
2014
- final ClusterState state = getLastAcceptedState (); // committed state
2015
- if (localNodeMayWinElection (state , electionStrategy ).mayWin () == false ) {
2016
- final List <DiscoveryNode > masterCandidates = completedNodes ().stream ()
2017
- .filter (DiscoveryNode ::isMasterNode )
2018
- .filter (node -> electionStrategy .nodeMayWinElection (state , node ).mayWin ())
2019
- .filter (node -> {
2020
- // check if master candidate would be able to get an election quorum if we were to
2021
- // abdicate to it. Assume that every node that completed the publication can provide
2022
- // a vote in that next election and has the latest state.
2023
- final long futureElectionTerm = state .term () + 1 ;
2024
- final VoteCollection futureVoteCollection = new VoteCollection ();
2025
- completedNodes ().forEach (
2026
- completedNode -> futureVoteCollection .addJoinVote (
2027
- new Join (completedNode , node , futureElectionTerm , state .term (), state .version ())
2028
- )
2029
- );
2030
- return electionStrategy .isElectionQuorum (
2031
- node ,
2032
- futureElectionTerm ,
2033
- state .term (),
2034
- state .version (),
2035
- state .getLastCommittedConfiguration (),
2036
- state .getLastAcceptedConfiguration (),
2037
- futureVoteCollection
2038
- );
2039
- })
2040
- .toList ();
2041
- if (masterCandidates .isEmpty () == false ) {
2042
- abdicateTo (masterCandidates .get (random .nextInt (masterCandidates .size ())));
2043
- attemptReconfiguration = false ;
2044
- }
2045
- }
2046
- if (attemptReconfiguration ) {
2047
- scheduleReconfigurationIfNeeded ();
1992
+ @ Override
1993
+ public void onResponse (Void ignored ) {
1994
+ onClusterStateApplied ();
1995
+ clusterStatePublicationEvent .setMasterApplyElapsedMillis (
1996
+ transportService .getThreadPool ().rawRelativeTimeInMillis () - completionTimeMillis
1997
+ );
1998
+ synchronized (mutex ) {
1999
+ assert currentPublication .get () == CoordinatorPublication .this ;
2000
+ currentPublication = Optional .empty ();
2001
+ logger .debug ("publication ended successfully: {}" , CoordinatorPublication .this );
2002
+ // trigger term bump if new term was found during publication
2003
+ updateMaxTermSeen (getCurrentTerm ());
2004
+
2005
+ if (mode == Mode .LEADER ) {
2006
+ // if necessary, abdicate to another node or improve the voting configuration
2007
+ boolean attemptReconfiguration = true ;
2008
+ final ClusterState state = getLastAcceptedState (); // committed state
2009
+ if (localNodeMayWinElection (state , electionStrategy ).mayWin () == false ) {
2010
+ final List <DiscoveryNode > masterCandidates = completedNodes ().stream ()
2011
+ .filter (DiscoveryNode ::isMasterNode )
2012
+ .filter (node -> electionStrategy .nodeMayWinElection (state , node ).mayWin ())
2013
+ .filter (node -> {
2014
+ // check if master candidate would be able to get an election quorum if we were to
2015
+ // abdicate to it. Assume that every node that completed the publication can provide
2016
+ // a vote in that next election and has the latest state.
2017
+ final long futureElectionTerm = state .term () + 1 ;
2018
+ final VoteCollection futureVoteCollection = new VoteCollection ();
2019
+ completedNodes ().forEach (
2020
+ completedNode -> futureVoteCollection .addJoinVote (
2021
+ new Join (completedNode , node , futureElectionTerm , state .term (), state .version ())
2022
+ )
2023
+ );
2024
+ return electionStrategy .isElectionQuorum (
2025
+ node ,
2026
+ futureElectionTerm ,
2027
+ state .term (),
2028
+ state .version (),
2029
+ state .getLastCommittedConfiguration (),
2030
+ state .getLastAcceptedConfiguration (),
2031
+ futureVoteCollection
2032
+ );
2033
+ })
2034
+ .toList ();
2035
+ if (masterCandidates .isEmpty () == false ) {
2036
+ abdicateTo (masterCandidates .get (random .nextInt (masterCandidates .size ())));
2037
+ attemptReconfiguration = false ;
2048
2038
}
2049
2039
}
2050
- lagDetector .startLagDetector (publishRequest .getAcceptedState ().version ());
2051
- logIncompleteNodes (Level .WARN );
2040
+ if (attemptReconfiguration ) {
2041
+ scheduleReconfigurationIfNeeded ();
2042
+ }
2052
2043
}
2053
- cancelTimeoutHandlers ();
2054
- ackListener .onNodeAck (getLocalNode (), null );
2055
- publishListener .onResponse (null );
2044
+ lagDetector .startLagDetector (publishRequest .getAcceptedState ().version ());
2045
+ logIncompleteNodes (Level .WARN );
2056
2046
}
2047
+ cancelTimeoutHandlers ();
2048
+ ackListener .onNodeAck (getLocalNode (), null );
2049
+ publishListener .onResponse (null );
2057
2050
}
2058
- );
2051
+ } );
2059
2052
}
2060
2053
2061
2054
@ Override
0 commit comments