Skip to content

Commit aba6c68

Browse files
authored
Asyncify ElectionStrategy#beforeCommit (#94582)
Adjusts `ElectionStrategy#beforeCommit` to accept a listener, because the stateless implementation will need to call into some async code.
1 parent 4a93dba commit aba6c68

File tree

5 files changed

+67
-15
lines changed

5 files changed

+67
-15
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1690,8 +1690,8 @@ boolean cancelCommittedPublication() {
16901690
}
16911691
}
16921692

1693-
void beforeCommit(long term, long version) {
1694-
electionStrategy.beforeCommit(term, version);
1693+
private void beforeCommit(long term, long version, ActionListener<Void> listener) {
1694+
electionStrategy.beforeCommit(term, version, listener);
16951695
}
16961696

16971697
class CoordinatorPublication extends Publication {
@@ -1927,12 +1927,17 @@ protected boolean isPublishQuorum(VoteCollection votes) {
19271927
}
19281928

19291929
@Override
1930-
protected Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) {
1930+
protected Optional<ListenableFuture<ApplyCommitRequest>> handlePublishResponse(
1931+
DiscoveryNode sourceNode,
1932+
PublishResponse publishResponse
1933+
) {
19311934
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
19321935
assert getCurrentTerm() >= publishResponse.getTerm();
1933-
var applyCommit = coordinationState.get().handlePublishResponse(sourceNode, publishResponse);
1934-
applyCommit.ifPresent(applyCommitRequest -> beforeCommit(applyCommitRequest.getTerm(), applyCommitRequest.getVersion()));
1935-
return applyCommit;
1936+
return coordinationState.get().handlePublishResponse(sourceNode, publishResponse).map(applyCommitRequest -> {
1937+
final var future = new ListenableFuture<ApplyCommitRequest>();
1938+
beforeCommit(applyCommitRequest.getTerm(), applyCommitRequest.getVersion(), future.map(ignored -> applyCommitRequest));
1939+
return future;
1940+
});
19361941
}
19371942

19381943
@Override
@@ -1981,6 +1986,7 @@ protected void sendApplyCommit(
19811986
ActionListener<Empty> responseActionListener
19821987
) {
19831988
assert transportService.getThreadPool().getThreadContext().isSystemContext();
1989+
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
19841990
try {
19851991
transportService.sendRequest(
19861992
destination,
@@ -1997,6 +2003,17 @@ protected void sendApplyCommit(
19972003
responseActionListener.onFailure(e);
19982004
}
19992005
}
2006+
2007+
@Override
2008+
protected <T> ActionListener<T> wrapListener(ActionListener<T> listener) {
2009+
return wrapWithMutex(listener);
2010+
}
2011+
2012+
@Override
2013+
boolean publicationCompletedIffAllTargetsInactiveOrCancelled() {
2014+
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
2015+
return super.publicationCompletedIffAllTargetsInactiveOrCancelled();
2016+
}
20002017
}
20012018

20022019
public interface PeerFinderListener {

server/src/main/java/org/elasticsearch/cluster/coordination/ElectionStrategy.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,5 +105,7 @@ public boolean isInvalidReconfiguration(
105105
&& lastCommittedConfiguration.equals(lastAcceptedConfiguration) == false;
106106
}
107107

108-
public void beforeCommit(long term, long version) {}
108+
public void beforeCommit(long term, long version, ActionListener<Void> listener) {
109+
listener.onResponse(null);
110+
}
109111
}

server/src/main/java/org/elasticsearch/cluster/coordination/Publication.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.ClusterState;
1717
import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener;
1818
import org.elasticsearch.cluster.node.DiscoveryNode;
19+
import org.elasticsearch.common.util.concurrent.ListenableFuture;
1920
import org.elasticsearch.core.TimeValue;
2021
import org.elasticsearch.transport.TransportException;
2122
import org.elasticsearch.transport.TransportResponse;
@@ -37,7 +38,7 @@ public abstract class Publication {
3738
private final LongSupplier currentTimeSupplier;
3839
private final long startTime;
3940

40-
private Optional<ApplyCommitRequest> applyCommitRequest; // set when state is committed
41+
private Optional<ListenableFuture<ApplyCommitRequest>> applyCommitRequest; // set when state is committed
4142
private boolean isCompleted; // set when publication is completed
4243
private boolean cancelled; // set when publication is cancelled
4344

@@ -122,7 +123,7 @@ private void onPossibleCompletion() {
122123
}
123124

124125
// For assertions only: verify that this invariant holds
125-
private boolean publicationCompletedIffAllTargetsInactiveOrCancelled() {
126+
boolean publicationCompletedIffAllTargetsInactiveOrCancelled() {
126127
if (cancelled == false) {
127128
for (final PublicationTarget target : publicationTargets) {
128129
if (target.isActive()) {
@@ -173,7 +174,10 @@ protected final long getStartTime() {
173174

174175
protected abstract boolean isPublishQuorum(CoordinationState.VoteCollection votes);
175176

176-
protected abstract Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse);
177+
protected abstract Optional<ListenableFuture<ApplyCommitRequest>> handlePublishResponse(
178+
DiscoveryNode sourceNode,
179+
PublishResponse publishResponse
180+
);
177181

178182
protected abstract void onJoin(Join join);
179183

@@ -191,6 +195,8 @@ protected abstract void sendApplyCommit(
191195
ActionListener<TransportResponse.Empty> responseActionListener
192196
);
193197

198+
protected abstract <T> ActionListener<T> wrapListener(ActionListener<T> listener);
199+
194200
@Override
195201
public String toString() {
196202
return "Publication{term="
@@ -280,7 +286,19 @@ void sendApplyCommit() {
280286
assert state == PublicationTargetState.WAITING_FOR_QUORUM : state + " -> " + PublicationTargetState.SENT_APPLY_COMMIT;
281287
state = PublicationTargetState.SENT_APPLY_COMMIT;
282288
assert applyCommitRequest.isPresent();
283-
Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest.get(), new ApplyCommitResponseHandler());
289+
applyCommitRequest.get().addListener(wrapListener(new ActionListener<>() {
290+
@Override
291+
public void onResponse(ApplyCommitRequest applyCommitRequest) {
292+
Publication.this.sendApplyCommit(discoveryNode, applyCommitRequest, new ApplyCommitResponseHandler());
293+
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
294+
}
295+
296+
@Override
297+
public void onFailure(Exception e) {
298+
setFailed(e);
299+
onPossibleCommitFailure();
300+
}
301+
}));
284302
assert publicationCompletedIffAllTargetsInactiveOrCancelled();
285303
}
286304

server/src/test/java/org/elasticsearch/cluster/coordination/AtomicRegisterCoordinatorTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,11 +515,13 @@ public boolean isInvalidReconfiguration(
515515
}
516516

517517
@Override
518-
public void beforeCommit(long term, long version) {
518+
public void beforeCommit(long term, long version, ActionListener<Void> listener) {
519519
// TODO: add a test to ensure that this gets called
520520
final var currentTermOwner = register.getTermOwner();
521521
if (currentTermOwner.term() > term) {
522-
throw new CoordinationStateRejectedException("Term " + term + " already claimed by another node");
522+
listener.onFailure(new CoordinationStateRejectedException("Term " + term + " already claimed by another node"));
523+
} else {
524+
listener.onResponse(null);
523525
}
524526
}
525527
}

server/src/test/java/org/elasticsearch/cluster/coordination/PublicationTests.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
1717
import org.elasticsearch.cluster.node.DiscoveryNodes;
1818
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.util.concurrent.ListenableFuture;
1920
import org.elasticsearch.common.util.set.Sets;
2021
import org.elasticsearch.core.Nullable;
2122
import org.elasticsearch.core.TimeValue;
@@ -90,8 +91,20 @@ protected boolean isPublishQuorum(CoordinationState.VoteCollection votes) {
9091
}
9192

9293
@Override
93-
protected Optional<ApplyCommitRequest> handlePublishResponse(DiscoveryNode sourceNode, PublishResponse publishResponse) {
94-
return coordinationState.handlePublishResponse(sourceNode, publishResponse);
94+
protected Optional<ListenableFuture<ApplyCommitRequest>> handlePublishResponse(
95+
DiscoveryNode sourceNode,
96+
PublishResponse publishResponse
97+
) {
98+
return coordinationState.handlePublishResponse(sourceNode, publishResponse).map(applyCommitRequest -> {
99+
final var future = new ListenableFuture<ApplyCommitRequest>();
100+
future.onResponse(applyCommitRequest);
101+
return future;
102+
});
103+
}
104+
105+
@Override
106+
protected <T> ActionListener<T> wrapListener(ActionListener<T> listener) {
107+
return listener;
95108
}
96109
};
97110
currentPublication.start(faultyNodes);

0 commit comments

Comments
 (0)