Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.FixForMultiProject;
Expand Down Expand Up @@ -355,9 +354,9 @@ private void onLeaderFailure(Supplier<String> message, Exception e) {
assert lastKnownLeader.isPresent();
if (logger.isDebugEnabled()) {
// TODO this is a workaround for log4j's Supplier. We should remove this, once using ES logging api
logger.info(() -> message.get(), e);
logger.info(message::get, e);
} else {
logger.info(() -> message.get());
logger.info(message::get);
}
}
becomeCandidate("onLeaderFailure");
Expand Down Expand Up @@ -514,7 +513,7 @@ private void closePrevotingRound() {

/**
* Updates {@link #maxTermSeen} if greater.
*
* <p>
* Every time a new term is found, either from another node requesting election, or this node trying to run for election, always update
* the max term number. The max term may not reflect an actual election, but rather an election attempt by some node in the
* cluster.
Expand All @@ -524,7 +523,7 @@ private void updateMaxTermSeen(final long term) {
maxTermSeen = Math.max(maxTermSeen, term);
final long currentTerm = getCurrentTerm();
if (mode == Mode.LEADER && maxTermSeen > currentTerm) {
// Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that
// Bump our term. However, if there is a publication in flight then doing so would cancel the publication, so don't do that
// since we check whether a term bump is needed at the end of the publication too.
if (publicationInProgress()) {
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm);
Expand Down Expand Up @@ -794,8 +793,8 @@ private void processJoinRequest(JoinRequest joinRequest, ActionListener<Void> jo
synchronized (mutex) {
updateMaxTermSeen(joinRequest.getTerm());

final CoordinationState coordState = coordinationState.get();
final boolean prevElectionWon = coordState.electionWon()
final CoordinationState localCoordinationState = coordinationState.get();
final boolean previousElectionWon = localCoordinationState.electionWon()
&& optionalJoin.stream().allMatch(j -> j.term() <= getCurrentTerm());

optionalJoin.ifPresent(this::handleJoin);
Expand All @@ -806,7 +805,7 @@ private void processJoinRequest(JoinRequest joinRequest, ActionListener<Void> jo
joinListener
);

if (prevElectionWon == false && coordState.electionWon()) {
if (previousElectionWon == false && localCoordinationState.electionWon()) {
becomeLeader();
}
}
Expand Down Expand Up @@ -1616,7 +1615,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
clusterStatePublicationEvent,
publishRequest,
publicationContext,
new ListenableFuture<>(),
new SubscribableListener<>(),
ackListener,
publishListener
);
Expand Down Expand Up @@ -1653,7 +1652,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
private boolean assertPreviousStateConsistency(ClusterStatePublicationEvent clusterStatePublicationEvent) {
if (clusterStatePublicationEvent.getOldState() != coordinationState.get().getLastAcceptedState()) {
// compare JSON representations
@FixForMultiProject // this is just so the toXContent doesnt throw - we want the same contents, but dont care if it's MP or not
@FixForMultiProject // this is just so toXContent doesn't throw - we want the same contents, but don't care if it's MP or not
ToXContent.Params params = new ToXContent.MapParams(Map.of("multi-project", "true"));

String oldState = Strings.toString(ChunkedToXContent.wrapAsToXContent(clusterStatePublicationEvent.getOldState()), params);
Expand All @@ -1669,7 +1668,7 @@ private boolean assertPreviousStateConsistency(ClusterStatePublicationEvent clus
}

private <T> ActionListener<T> wrapWithMutex(ActionListener<T> listener) {
return new ActionListener<T>() {
return new ActionListener<>() {
@Override
public void onResponse(T t) {
synchronized (mutex) {
Expand All @@ -1688,9 +1687,7 @@ public void onFailure(Exception e) {

private void cancelActivePublication(String reason) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
if (currentPublication.isPresent()) {
currentPublication.get().cancel(reason);
}
currentPublication.ifPresent(coordinatorPublication -> coordinatorPublication.cancel(reason));
}

public Collection<BiConsumer<DiscoveryNode, ClusterState>> getOnJoinValidators() {
Expand Down Expand Up @@ -1863,7 +1860,7 @@ class CoordinatorPublication extends Publication {

private final ClusterStatePublicationEvent clusterStatePublicationEvent;
private final PublishRequest publishRequest;
private final ListenableFuture<Void> localNodeAckEvent;
private final SubscribableListener<Void> localNodeAckEvent;
private final AckListener ackListener;
private final ActionListener<Void> publishListener;
private final PublicationTransportHandler.PublicationContext publicationContext;
Expand All @@ -1881,7 +1878,7 @@ class CoordinatorPublication extends Publication {
ClusterStatePublicationEvent clusterStatePublicationEvent,
PublishRequest publishRequest,
PublicationTransportHandler.PublicationContext publicationContext,
ListenableFuture<Void> localNodeAckEvent,
SubscribableListener<Void> localNodeAckEvent,
AckListener ackListener,
ActionListener<Void> publishListener
) {
Expand Down Expand Up @@ -1981,81 +1978,77 @@ public void onResponse(Void ignore) {
assert receivedJoinsProcessed == false;
receivedJoinsProcessed = true;

clusterApplier.onNewClusterState(
CoordinatorPublication.this.toString(),
() -> applierState,
new ActionListener<Void>() {
@Override
public void onFailure(Exception e) {
synchronized (mutex) {
removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState");
}
cancelTimeoutHandlers();
ackListener.onNodeAck(getLocalNode(), e);
publishListener.onFailure(e);
clusterApplier.onNewClusterState(CoordinatorPublication.this.toString(), () -> applierState, new ActionListener<>() {
@Override
public void onFailure(Exception e) {
synchronized (mutex) {
removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState");
}
cancelTimeoutHandlers();
ackListener.onNodeAck(getLocalNode(), e);
publishListener.onFailure(e);
}

@Override
public void onResponse(Void ignored) {
onClusterStateApplied();
clusterStatePublicationEvent.setMasterApplyElapsedMillis(
transportService.getThreadPool().rawRelativeTimeInMillis() - completionTimeMillis
);
synchronized (mutex) {
assert currentPublication.get() == CoordinatorPublication.this;
currentPublication = Optional.empty();
logger.debug("publication ended successfully: {}", CoordinatorPublication.this);
// trigger term bump if new term was found during publication
updateMaxTermSeen(getCurrentTerm());

if (mode == Mode.LEADER) {
// if necessary, abdicate to another node or improve the voting configuration
boolean attemptReconfiguration = true;
final ClusterState state = getLastAcceptedState(); // committed state
if (localNodeMayWinElection(state, electionStrategy).mayWin() == false) {
final List<DiscoveryNode> masterCandidates = completedNodes().stream()
.filter(DiscoveryNode::isMasterNode)
.filter(node -> electionStrategy.nodeMayWinElection(state, node).mayWin())
.filter(node -> {
// check if master candidate would be able to get an election quorum if we were to
// abdicate to it. Assume that every node that completed the publication can provide
// a vote in that next election and has the latest state.
final long futureElectionTerm = state.term() + 1;
final VoteCollection futureVoteCollection = new VoteCollection();
completedNodes().forEach(
completedNode -> futureVoteCollection.addJoinVote(
new Join(completedNode, node, futureElectionTerm, state.term(), state.version())
)
);
return electionStrategy.isElectionQuorum(
node,
futureElectionTerm,
state.term(),
state.version(),
state.getLastCommittedConfiguration(),
state.getLastAcceptedConfiguration(),
futureVoteCollection
);
})
.toList();
if (masterCandidates.isEmpty() == false) {
abdicateTo(masterCandidates.get(random.nextInt(masterCandidates.size())));
attemptReconfiguration = false;
}
}
if (attemptReconfiguration) {
scheduleReconfigurationIfNeeded();
@Override
public void onResponse(Void ignored) {
onClusterStateApplied();
clusterStatePublicationEvent.setMasterApplyElapsedMillis(
transportService.getThreadPool().rawRelativeTimeInMillis() - completionTimeMillis
);
synchronized (mutex) {
assert currentPublication.get() == CoordinatorPublication.this;
currentPublication = Optional.empty();
logger.debug("publication ended successfully: {}", CoordinatorPublication.this);
// trigger term bump if new term was found during publication
updateMaxTermSeen(getCurrentTerm());

if (mode == Mode.LEADER) {
// if necessary, abdicate to another node or improve the voting configuration
boolean attemptReconfiguration = true;
final ClusterState state = getLastAcceptedState(); // committed state
if (localNodeMayWinElection(state, electionStrategy).mayWin() == false) {
final List<DiscoveryNode> masterCandidates = completedNodes().stream()
.filter(DiscoveryNode::isMasterNode)
.filter(node -> electionStrategy.nodeMayWinElection(state, node).mayWin())
.filter(node -> {
// check if master candidate would be able to get an election quorum if we were to
// abdicate to it. Assume that every node that completed the publication can provide
// a vote in that next election and has the latest state.
final long futureElectionTerm = state.term() + 1;
final VoteCollection futureVoteCollection = new VoteCollection();
completedNodes().forEach(
completedNode -> futureVoteCollection.addJoinVote(
new Join(completedNode, node, futureElectionTerm, state.term(), state.version())
)
);
return electionStrategy.isElectionQuorum(
node,
futureElectionTerm,
state.term(),
state.version(),
state.getLastCommittedConfiguration(),
state.getLastAcceptedConfiguration(),
futureVoteCollection
);
})
.toList();
if (masterCandidates.isEmpty() == false) {
abdicateTo(masterCandidates.get(random.nextInt(masterCandidates.size())));
attemptReconfiguration = false;
}
}
lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
logIncompleteNodes(Level.WARN);
if (attemptReconfiguration) {
scheduleReconfigurationIfNeeded();
}
}
cancelTimeoutHandlers();
ackListener.onNodeAck(getLocalNode(), null);
publishListener.onResponse(null);
lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
logIncompleteNodes(Level.WARN);
}
cancelTimeoutHandlers();
ackListener.onNodeAck(getLocalNode(), null);
publishListener.onResponse(null);
}
);
});
}

@Override
Expand Down