Skip to content

Commit 9a28516

Browse files
authored
Forbid null ack timeouts (#113443)
We've asserted that `ackTimeout` is non-null for a while now without issues, so this commit adds code to enforce this invariant.
1 parent ca4bf1a commit 9a28516

File tree

4 files changed

+22
-32
lines changed

4 files changed

+22
-32
lines changed

server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.elasticsearch.common.Priority;
1616
import org.elasticsearch.core.TimeValue;
1717

18+
import java.util.Objects;
19+
1820
/**
1921
* An extension interface to {@link ClusterStateUpdateTask} that allows the caller to be notified after the master has
2022
* computed, published, accepted, committed, and applied the cluster state update AND only after the rest of the nodes
@@ -54,7 +56,7 @@ protected AckedClusterStateUpdateTask(
5456
) {
5557
super(priority, masterNodeTimeout);
5658
this.listener = (ActionListener<AcknowledgedResponse>) listener;
57-
this.ackTimeout = ackTimeout;
59+
this.ackTimeout = Objects.requireNonNull(ackTimeout);
5860
}
5961

6062
/**
@@ -81,10 +83,6 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) {
8183
return AcknowledgedResponse.of(acknowledged);
8284
}
8385

84-
/**
85-
* Called once the acknowledgement timeout defined by
86-
* {@link AckedClusterStateUpdateTask#ackTimeout()} has expired
87-
*/
8886
public void onAckTimeout() {
8987
listener.onResponse(newResponse(false));
9088
}
@@ -94,9 +92,6 @@ public void onFailure(Exception e) {
9492
listener.onFailure(e);
9593
}
9694

97-
/**
98-
* Acknowledgement timeout, maximum time interval to wait for acknowledgements
99-
*/
10095
public final TimeValue ackTimeout() {
10196
return ackTimeout;
10297
}

server/src/main/java/org/elasticsearch/cluster/ClusterStateAckListener.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,39 +20,37 @@
2020
public interface ClusterStateAckListener {
2121

2222
/**
23-
* Called to determine which nodes the acknowledgement is expected from.
23+
* Called to determine the nodes from which an acknowledgement is expected.
24+
* <p>
25+
* This method will be called multiple times to determine the set of acking nodes, so it is crucial for it to return consistent results:
26+
* Given the same listener instance and the same node parameter, the method implementation should return the same result.
2427
*
25-
* As this method will be called multiple times to determine the set of acking nodes,
26-
* it is crucial for it to return consistent results: Given the same listener instance
27-
* and the same node parameter, the method implementation should return the same result.
28-
*
29-
* @param discoveryNode a node
30-
* @return true if the node is expected to send ack back, false otherwise
28+
* @return {@code true} if and only if this task will wait for an ack from the given node.
3129
*/
3230
boolean mustAck(DiscoveryNode discoveryNode);
3331

3432
/**
35-
* Called once all the nodes have acknowledged the cluster state update request. Must be
36-
* very lightweight execution, since it gets executed on the cluster service thread.
33+
* Called once all the selected nodes have acknowledged the cluster state update request. Must be very lightweight execution, since it
34+
* is executed on the cluster service thread.
3735
*/
3836
void onAllNodesAcked();
3937

4038
/**
41-
* Called after all the nodes have acknowledged the cluster state update request but at least one of them failed. Must be
42-
* very lightweight execution, since it gets executed on the cluster service thread.
39+
* Called after all the nodes have acknowledged the cluster state update request but at least one of them failed. Must be very
40+
* lightweight execution, since it is executed on the cluster service thread.
4341
*
44-
* @param e optional error that might have been thrown
42+
* @param e exception representing the failure.
4543
*/
4644
void onAckFailure(Exception e);
4745

4846
/**
49-
* Called once the acknowledgement timeout defined by
50-
* {@link AckedClusterStateUpdateTask#ackTimeout()} has expired
47+
* Called if the acknowledgement timeout defined by {@link ClusterStateAckListener#ackTimeout()} expires while still waiting for acks.
5148
*/
5249
void onAckTimeout();
5350

5451
/**
55-
* @return acknowledgement timeout, maximum time interval to wait for acknowledgements
52+
* @return acknowledgement timeout, i.e. the maximum time interval to wait for acknowledgements. Return {@link TimeValue#MINUS_ONE} if
53+
* the request should wait indefinitely for acknowledgements.
5654
*/
5755
TimeValue ackTimeout();
5856

server/src/main/java/org/elasticsearch/cluster/service/MasterService.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.elasticsearch.core.Releasables;
4848
import org.elasticsearch.core.SuppressForbidden;
4949
import org.elasticsearch.core.TimeValue;
50-
import org.elasticsearch.core.UpdateForV9;
5150
import org.elasticsearch.node.Node;
5251
import org.elasticsearch.tasks.Task;
5352
import org.elasticsearch.tasks.TaskAwareRequest;
@@ -656,7 +655,7 @@ public void onAckSuccess() {
656655
}
657656
}
658657

659-
public void onAckFailure(@Nullable Exception e) {
658+
public void onAckFailure(Exception e) {
660659
try (ThreadContext.StoredContext ignore = context.get()) {
661660
restoreResponseHeaders.run();
662661
listener.onAckFailure(e);
@@ -688,6 +687,7 @@ public TimeValue ackTimeout() {
688687
private static class TaskAckListener {
689688

690689
private final ContextPreservingAckListener contextPreservingAckListener;
690+
private final TimeValue ackTimeout;
691691
private final CountDown countDown;
692692
private final DiscoveryNode masterNode;
693693
private final ThreadPool threadPool;
@@ -702,6 +702,7 @@ private static class TaskAckListener {
702702
ThreadPool threadPool
703703
) {
704704
this.contextPreservingAckListener = contextPreservingAckListener;
705+
this.ackTimeout = Objects.requireNonNull(contextPreservingAckListener.ackTimeout());
705706
this.clusterStateVersion = clusterStateVersion;
706707
this.threadPool = threadPool;
707708
this.masterNode = nodes.getMasterNode();
@@ -716,14 +717,7 @@ private static class TaskAckListener {
716717
this.countDown = new CountDown(countDown + 1); // we also wait for onCommit to be called
717718
}
718719

719-
@UpdateForV9 // properly forbid ackTimeout == null after enough time has passed to be sure it's not used in production
720720
public void onCommit(TimeValue commitTime) {
721-
TimeValue ackTimeout = contextPreservingAckListener.ackTimeout();
722-
if (ackTimeout == null) {
723-
assert false : "ackTimeout must always be present: " + contextPreservingAckListener;
724-
ackTimeout = TimeValue.ZERO;
725-
}
726-
727721
if (ackTimeout.millis() < 0) {
728722
if (countDown.countDown()) {
729723
finish();

server/src/main/java/org/elasticsearch/cluster/service/MasterServiceTaskQueue.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ public interface MasterServiceTaskQueue<T extends ClusterStateTaskListener> {
3636
* ?master_timeout}, which is typically available from {@link MasterNodeRequest#masterNodeTimeout()}. Tasks that
3737
* correspond with internal actions should normally have no timeout since it is usually better to wait patiently in the
3838
* queue until processed rather than to fail, especially if the only reasonable reaction to a failure is to retry.
39+
* <p>
40+
* The values {@code null} and {@link MasterNodeRequest#INFINITE_MASTER_NODE_TIMEOUT} (i.e. {@link TimeValue#MINUS_ONE})
41+
* both indicate that the task should never time out.
3942
*/
4043
void submitTask(String source, T task, @Nullable TimeValue timeout);
4144
}

0 commit comments

Comments
 (0)