From 20f4aca513ea218e453d7ef96add458933dc13d0 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 5 Aug 2025 13:33:21 +0100 Subject: [PATCH] Misc cleanups in Coordinator Mainly, switching a `ListenableFuture` for a `SubscribableListener` to avoid the unnecessary exception-mangling. But also cleaning up some other IDE warnings. --- .../cluster/coordination/Coordinator.java | 161 +++++++++--------- 1 file changed, 77 insertions(+), 84 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 3e474f07b43db..1976bda6c6aba 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -52,7 +52,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.FixForMultiProject; @@ -355,9 +354,9 @@ private void onLeaderFailure(Supplier message, Exception e) { assert lastKnownLeader.isPresent(); if (logger.isDebugEnabled()) { // TODO this is a workaround for log4j's Supplier. We should remove this, once using ES logging api - logger.info(() -> message.get(), e); + logger.info(message::get, e); } else { - logger.info(() -> message.get()); + logger.info(message::get); } } becomeCandidate("onLeaderFailure"); @@ -514,7 +513,7 @@ private void closePrevotingRound() { /** * Updates {@link #maxTermSeen} if greater. - * + *

* Every time a new term is found, either from another node requesting election, or this node trying to run for election, always update * the max term number. The max term may not reflect an actual election, but rather an election attempt by some node in the * cluster. @@ -524,7 +523,7 @@ private void updateMaxTermSeen(final long term) { maxTermSeen = Math.max(maxTermSeen, term); final long currentTerm = getCurrentTerm(); if (mode == Mode.LEADER && maxTermSeen > currentTerm) { - // Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that + // Bump our term. However, if there is a publication in flight then doing so would cancel the publication, so don't do that // since we check whether a term bump is needed at the end of the publication too. if (publicationInProgress()) { logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm); @@ -794,8 +793,8 @@ private void processJoinRequest(JoinRequest joinRequest, ActionListener jo synchronized (mutex) { updateMaxTermSeen(joinRequest.getTerm()); - final CoordinationState coordState = coordinationState.get(); - final boolean prevElectionWon = coordState.electionWon() + final CoordinationState localCoordinationState = coordinationState.get(); + final boolean previousElectionWon = localCoordinationState.electionWon() && optionalJoin.stream().allMatch(j -> j.term() <= getCurrentTerm()); optionalJoin.ifPresent(this::handleJoin); @@ -806,7 +805,7 @@ private void processJoinRequest(JoinRequest joinRequest, ActionListener jo joinListener ); - if (prevElectionWon == false && coordState.electionWon()) { + if (previousElectionWon == false && localCoordinationState.electionWon()) { becomeLeader(); } } @@ -1616,7 +1615,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) clusterStatePublicationEvent, publishRequest, publicationContext, - new ListenableFuture<>(), + new SubscribableListener<>(), ackListener, publishListener ); @@ -1653,7 +1652,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) private boolean assertPreviousStateConsistency(ClusterStatePublicationEvent clusterStatePublicationEvent) { if (clusterStatePublicationEvent.getOldState() != coordinationState.get().getLastAcceptedState()) { // compare JSON representations - @FixForMultiProject // this is just so the toXContent doesnt throw - we want the same contents, but dont care if it's MP or not + @FixForMultiProject // this is just so toXContent doesn't throw - we want the same contents, but don't care if it's MP or not ToXContent.Params params = new ToXContent.MapParams(Map.of("multi-project", "true")); String oldState = Strings.toString(ChunkedToXContent.wrapAsToXContent(clusterStatePublicationEvent.getOldState()), params); @@ -1669,7 +1668,7 @@ private boolean assertPreviousStateConsistency(ClusterStatePublicationEvent clus } private ActionListener wrapWithMutex(ActionListener listener) { - return new ActionListener() { + return new ActionListener<>() { @Override public void onResponse(T t) { synchronized (mutex) { @@ -1688,9 +1687,7 @@ public void onFailure(Exception e) { private void cancelActivePublication(String reason) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; - if (currentPublication.isPresent()) { - currentPublication.get().cancel(reason); - } + currentPublication.ifPresent(coordinatorPublication -> coordinatorPublication.cancel(reason)); } public Collection> getOnJoinValidators() { @@ -1863,7 +1860,7 @@ class CoordinatorPublication extends Publication { private final ClusterStatePublicationEvent clusterStatePublicationEvent; private final PublishRequest publishRequest; - private final ListenableFuture localNodeAckEvent; + private final SubscribableListener localNodeAckEvent; private final AckListener ackListener; private final ActionListener publishListener; private final PublicationTransportHandler.PublicationContext publicationContext; @@ -1881,7 +1878,7 @@ class CoordinatorPublication extends Publication { ClusterStatePublicationEvent clusterStatePublicationEvent, PublishRequest publishRequest, PublicationTransportHandler.PublicationContext publicationContext, - ListenableFuture localNodeAckEvent, + SubscribableListener localNodeAckEvent, AckListener ackListener, ActionListener publishListener ) { @@ -1981,81 +1978,77 @@ public void onResponse(Void ignore) { assert receivedJoinsProcessed == false; receivedJoinsProcessed = true; - clusterApplier.onNewClusterState( - CoordinatorPublication.this.toString(), - () -> applierState, - new ActionListener() { - @Override - public void onFailure(Exception e) { - synchronized (mutex) { - removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState"); - } - cancelTimeoutHandlers(); - ackListener.onNodeAck(getLocalNode(), e); - publishListener.onFailure(e); + clusterApplier.onNewClusterState(CoordinatorPublication.this.toString(), () -> applierState, new ActionListener<>() { + @Override + public void onFailure(Exception e) { + synchronized (mutex) { + removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState"); } + cancelTimeoutHandlers(); + ackListener.onNodeAck(getLocalNode(), e); + publishListener.onFailure(e); + } - @Override - public void onResponse(Void ignored) { - onClusterStateApplied(); - clusterStatePublicationEvent.setMasterApplyElapsedMillis( - transportService.getThreadPool().rawRelativeTimeInMillis() - completionTimeMillis - ); - synchronized (mutex) { - assert currentPublication.get() == CoordinatorPublication.this; - currentPublication = Optional.empty(); - logger.debug("publication ended successfully: {}", CoordinatorPublication.this); - // trigger term bump if new term was found during publication - updateMaxTermSeen(getCurrentTerm()); - - if (mode == Mode.LEADER) { - // if necessary, abdicate to another node or improve the voting configuration - boolean attemptReconfiguration = true; - final ClusterState state = getLastAcceptedState(); // committed state - if (localNodeMayWinElection(state, electionStrategy).mayWin() == false) { - final List masterCandidates = completedNodes().stream() - .filter(DiscoveryNode::isMasterNode) - .filter(node -> electionStrategy.nodeMayWinElection(state, node).mayWin()) - .filter(node -> { - // check if master candidate would be able to get an election quorum if we were to - // abdicate to it. Assume that every node that completed the publication can provide - // a vote in that next election and has the latest state. - final long futureElectionTerm = state.term() + 1; - final VoteCollection futureVoteCollection = new VoteCollection(); - completedNodes().forEach( - completedNode -> futureVoteCollection.addJoinVote( - new Join(completedNode, node, futureElectionTerm, state.term(), state.version()) - ) - ); - return electionStrategy.isElectionQuorum( - node, - futureElectionTerm, - state.term(), - state.version(), - state.getLastCommittedConfiguration(), - state.getLastAcceptedConfiguration(), - futureVoteCollection - ); - }) - .toList(); - if (masterCandidates.isEmpty() == false) { - abdicateTo(masterCandidates.get(random.nextInt(masterCandidates.size()))); - attemptReconfiguration = false; - } - } - if (attemptReconfiguration) { - scheduleReconfigurationIfNeeded(); + @Override + public void onResponse(Void ignored) { + onClusterStateApplied(); + clusterStatePublicationEvent.setMasterApplyElapsedMillis( + transportService.getThreadPool().rawRelativeTimeInMillis() - completionTimeMillis + ); + synchronized (mutex) { + assert currentPublication.get() == CoordinatorPublication.this; + currentPublication = Optional.empty(); + logger.debug("publication ended successfully: {}", CoordinatorPublication.this); + // trigger term bump if new term was found during publication + updateMaxTermSeen(getCurrentTerm()); + + if (mode == Mode.LEADER) { + // if necessary, abdicate to another node or improve the voting configuration + boolean attemptReconfiguration = true; + final ClusterState state = getLastAcceptedState(); // committed state + if (localNodeMayWinElection(state, electionStrategy).mayWin() == false) { + final List masterCandidates = completedNodes().stream() + .filter(DiscoveryNode::isMasterNode) + .filter(node -> electionStrategy.nodeMayWinElection(state, node).mayWin()) + .filter(node -> { + // check if master candidate would be able to get an election quorum if we were to + // abdicate to it. Assume that every node that completed the publication can provide + // a vote in that next election and has the latest state. + final long futureElectionTerm = state.term() + 1; + final VoteCollection futureVoteCollection = new VoteCollection(); + completedNodes().forEach( + completedNode -> futureVoteCollection.addJoinVote( + new Join(completedNode, node, futureElectionTerm, state.term(), state.version()) + ) + ); + return electionStrategy.isElectionQuorum( + node, + futureElectionTerm, + state.term(), + state.version(), + state.getLastCommittedConfiguration(), + state.getLastAcceptedConfiguration(), + futureVoteCollection + ); + }) + .toList(); + if (masterCandidates.isEmpty() == false) { + abdicateTo(masterCandidates.get(random.nextInt(masterCandidates.size()))); + attemptReconfiguration = false; } } - lagDetector.startLagDetector(publishRequest.getAcceptedState().version()); - logIncompleteNodes(Level.WARN); + if (attemptReconfiguration) { + scheduleReconfigurationIfNeeded(); + } } - cancelTimeoutHandlers(); - ackListener.onNodeAck(getLocalNode(), null); - publishListener.onResponse(null); + lagDetector.startLagDetector(publishRequest.getAcceptedState().version()); + logIncompleteNodes(Level.WARN); } + cancelTimeoutHandlers(); + ackListener.onNodeAck(getLocalNode(), null); + publishListener.onResponse(null); } - ); + }); } @Override