Skip to content

Commit 422c26b

Browse files
committed
Drop useless AckedRequest interface (#113255)
Almost every implementation of `AckedRequest` is an `AcknowledgedRequest` too, and the distinction is rather confusing. Moreover the other implementations of `AckedRequest` are a potential source of `null` timeouts that we'd like to get rid of. This commit simplifies the situation by dropping the unnecessary `AckedRequest` interface entirely.
1 parent dd3ceb0 commit 422c26b

File tree

11 files changed

+83
-113
lines changed

11 files changed

+83
-113
lines changed

qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestClusterInfoActionCancellationIT.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.client.Response;
2020
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
2121
import org.elasticsearch.cluster.ClusterState;
22-
import org.elasticsearch.cluster.ack.AckedRequest;
2322
import org.elasticsearch.cluster.block.ClusterBlock;
2423
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2524
import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -102,22 +101,9 @@ private void runTest(String actionName, String endpoint) throws Exception {
102101

103102
private void updateClusterState(Function<ClusterState, ClusterState> transformationFn) {
104103
final TimeValue timeout = TimeValue.timeValueSeconds(10);
105-
106-
final AckedRequest ackedRequest = new AckedRequest() {
107-
@Override
108-
public TimeValue ackTimeout() {
109-
return timeout;
110-
}
111-
112-
@Override
113-
public TimeValue masterNodeTimeout() {
114-
return timeout;
115-
}
116-
};
117-
118104
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
119105
internalCluster().getAnyMasterNodeInstance(ClusterService.class)
120-
.submitUnbatchedStateUpdateTask("get_mappings_cancellation_test", new AckedClusterStateUpdateTask(ackedRequest, future) {
106+
.submitUnbatchedStateUpdateTask("get_mappings_cancellation_test", new AckedClusterStateUpdateTask(timeout, timeout, future) {
121107
@Override
122108
public ClusterState execute(ClusterState currentState) throws Exception {
123109
return transformationFn.apply(currentState);

server/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.elasticsearch.action.support.master;
1010

1111
import org.elasticsearch.action.ActionRequestValidationException;
12-
import org.elasticsearch.cluster.ack.AckedRequest;
1312
import org.elasticsearch.common.io.stream.StreamInput;
1413
import org.elasticsearch.common.io.stream.StreamOutput;
1514
import org.elasticsearch.core.TimeValue;
@@ -23,9 +22,7 @@
2322
* Abstract base class for action requests that track acknowledgements of cluster state updates: such a request is acknowledged only once
2423
* the cluster state update is committed and all relevant nodes have applied it and acknowledged its application to the elected master..
2524
*/
26-
public abstract class AcknowledgedRequest<Request extends MasterNodeRequest<Request>> extends MasterNodeRequest<Request>
27-
implements
28-
AckedRequest {
25+
public abstract class AcknowledgedRequest<Request extends MasterNodeRequest<Request>> extends MasterNodeRequest<Request> {
2926

3027
public static final TimeValue DEFAULT_ACK_TIMEOUT = timeValueSeconds(30);
3128

@@ -74,7 +71,6 @@ public final Request ackTimeout(TimeValue ackTimeout) {
7471
/**
7572
* @return the current ack timeout as a {@link TimeValue}
7673
*/
77-
@Override
7874
public final TimeValue ackTimeout() {
7975
return ackTimeout;
8076
}

server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
package org.elasticsearch.cluster;
1010

1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1213
import org.elasticsearch.action.support.master.AcknowledgedResponse;
13-
import org.elasticsearch.cluster.ack.AckedRequest;
1414
import org.elasticsearch.cluster.node.DiscoveryNode;
1515
import org.elasticsearch.common.Priority;
1616
import org.elasticsearch.core.TimeValue;
@@ -23,21 +23,38 @@
2323
public abstract class AckedClusterStateUpdateTask extends ClusterStateUpdateTask implements ClusterStateAckListener {
2424

2525
private final ActionListener<AcknowledgedResponse> listener;
26-
private final AckedRequest request;
26+
private final TimeValue ackTimeout;
2727

28-
protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<? extends AcknowledgedResponse> listener) {
29-
this(Priority.NORMAL, request, listener);
28+
protected AckedClusterStateUpdateTask(AcknowledgedRequest<?> request, ActionListener<? extends AcknowledgedResponse> listener) {
29+
this(Priority.NORMAL, request.masterNodeTimeout(), request.ackTimeout(), listener);
30+
}
31+
32+
protected AckedClusterStateUpdateTask(
33+
TimeValue masterNodeTimeout,
34+
TimeValue ackTimeout,
35+
ActionListener<? extends AcknowledgedResponse> listener
36+
) {
37+
this(Priority.NORMAL, masterNodeTimeout, ackTimeout, listener);
38+
}
39+
40+
protected AckedClusterStateUpdateTask(
41+
Priority priority,
42+
AcknowledgedRequest<?> request,
43+
ActionListener<? extends AcknowledgedResponse> listener
44+
) {
45+
this(priority, request.masterNodeTimeout(), request.ackTimeout(), listener);
3046
}
3147

3248
@SuppressWarnings("unchecked")
3349
protected AckedClusterStateUpdateTask(
3450
Priority priority,
35-
AckedRequest request,
51+
TimeValue masterNodeTimeout,
52+
TimeValue ackTimeout,
3653
ActionListener<? extends AcknowledgedResponse> listener
3754
) {
38-
super(priority, request.masterNodeTimeout());
55+
super(priority, masterNodeTimeout);
3956
this.listener = (ActionListener<AcknowledgedResponse>) listener;
40-
this.request = request;
57+
this.ackTimeout = ackTimeout;
4158
}
4259

4360
/**
@@ -81,6 +98,6 @@ public void onFailure(Exception e) {
8198
* Acknowledgement timeout, maximum time interval to wait for acknowledgements
8299
*/
83100
public final TimeValue ackTimeout() {
84-
return request.ackTimeout();
101+
return ackTimeout;
85102
}
86103
}

server/src/main/java/org/elasticsearch/cluster/ack/AckedRequest.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateRequest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@
1515
* Base class to be used when needing to update the cluster state
1616
* Contains the basic fields that are always needed
1717
*/
18-
public abstract class ClusterStateUpdateRequest<T extends ClusterStateUpdateRequest<T>> implements AckedRequest {
18+
public abstract class ClusterStateUpdateRequest<T extends ClusterStateUpdateRequest<T>> {
1919

2020
private TimeValue ackTimeout;
2121
private TimeValue masterNodeTimeout;
2222

2323
/**
2424
* Returns the maximum time interval to wait for acknowledgements
2525
*/
26-
@Override
2726
public TimeValue ackTimeout() {
2827
return ackTimeout;
2928
}
@@ -41,7 +40,6 @@ public T ackTimeout(TimeValue ackTimeout) {
4140
* Returns the maximum time interval to wait for the request to
4241
* be completed on the master node
4342
*/
44-
@Override
4543
public TimeValue masterNodeTimeout() {
4644
return masterNodeTimeout;
4745
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,12 @@ public void createDataStream(CreateDataStreamClusterStateUpdateRequest request,
9999
var delegate = new AllocationActionListener<>(listener, threadPool.getThreadContext());
100100
submitUnbatchedTask(
101101
"create-data-stream [" + request.name + "]",
102-
new AckedClusterStateUpdateTask(Priority.HIGH, request, delegate.clusterStateUpdate()) {
102+
new AckedClusterStateUpdateTask(
103+
Priority.HIGH,
104+
request.masterNodeTimeout(),
105+
request.ackTimeout(),
106+
delegate.clusterStateUpdate()
107+
) {
103108
@Override
104109
public ClusterState execute(ClusterState currentState) throws Exception {
105110
// When we're manually creating a data stream (i.e. not an auto creation), we don't need to initialize the failure store

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,12 @@ private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request,
296296
var delegate = new AllocationActionListener<>(listener, threadPool.getThreadContext());
297297
submitUnbatchedTask(
298298
"create-index [" + request.index() + "], cause [" + request.cause() + "]",
299-
new AckedClusterStateUpdateTask(Priority.URGENT, request, delegate.clusterStateUpdate()) {
299+
new AckedClusterStateUpdateTask(
300+
Priority.URGENT,
301+
request.masterNodeTimeout(),
302+
request.ackTimeout(),
303+
delegate.clusterStateUpdate()
304+
) {
300305

301306
@Override
302307
public ClusterState execute(ClusterState currentState) throws Exception {

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,12 @@ public void migrateToDataStream(
105105
var delegate = new AllocationActionListener<>(listener, threadContext);
106106
submitUnbatchedTask(
107107
"migrate-to-data-stream [" + request.aliasName + "]",
108-
new AckedClusterStateUpdateTask(Priority.HIGH, request, delegate.clusterStateUpdate()) {
108+
new AckedClusterStateUpdateTask(
109+
Priority.HIGH,
110+
request.masterNodeTimeout(),
111+
request.ackTimeout(),
112+
delegate.clusterStateUpdate()
113+
) {
109114

110115
@Override
111116
public ClusterState execute(ClusterState currentState) throws Exception {

server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.ElasticsearchException;
1616
import org.elasticsearch.action.ActionListener;
1717
import org.elasticsearch.action.support.PlainActionFuture;
18+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1819
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1920
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
2021
import org.elasticsearch.cluster.ClusterName;
@@ -26,7 +27,6 @@
2627
import org.elasticsearch.cluster.LocalMasterServiceTask;
2728
import org.elasticsearch.cluster.NotMasterException;
2829
import org.elasticsearch.cluster.SimpleBatchedExecutor;
29-
import org.elasticsearch.cluster.ack.AckedRequest;
3030
import org.elasticsearch.cluster.block.ClusterBlocks;
3131
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
3232
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
@@ -1570,7 +1570,7 @@ public void onAckFailure(Exception e) {
15701570

15711571
masterService.submitUnbatchedStateUpdateTask(
15721572
"test2",
1573-
new AckedClusterStateUpdateTask(ackedRequest(TimeValue.ZERO, null), null) {
1573+
new AckedClusterStateUpdateTask(ackedRequest(TimeValue.ZERO, TimeValue.MINUS_ONE), null) {
15741574
@Override
15751575
public ClusterState execute(ClusterState currentState) {
15761576
return ClusterState.builder(currentState).build();
@@ -1622,7 +1622,7 @@ public void onAckTimeout() {
16221622

16231623
masterService.submitUnbatchedStateUpdateTask(
16241624
"test2",
1625-
new AckedClusterStateUpdateTask(ackedRequest(ackTimeout, null), null) {
1625+
new AckedClusterStateUpdateTask(ackedRequest(ackTimeout, TimeValue.MINUS_ONE), null) {
16261626
@Override
16271627
public ClusterState execute(ClusterState currentState) {
16281628
threadPool.getThreadContext().addResponseHeader(responseHeaderName, responseHeaderValue);
@@ -1677,7 +1677,7 @@ public void onAckTimeout() {
16771677

16781678
masterService.submitUnbatchedStateUpdateTask(
16791679
"test2",
1680-
new AckedClusterStateUpdateTask(ackedRequest(TimeValue.MINUS_ONE, null), null) {
1680+
new AckedClusterStateUpdateTask(ackedRequest(TimeValue.MINUS_ONE, TimeValue.MINUS_ONE), null) {
16811681
@Override
16821682
public ClusterState execute(ClusterState currentState) {
16831683
return ClusterState.builder(currentState).build();
@@ -2656,20 +2656,15 @@ public static ClusterState discoveryState(MasterService masterService) {
26562656
}
26572657

26582658
/**
2659-
* Returns a plain {@link AckedRequest} that does not implement any functionality outside of the timeout getters.
2659+
* Returns a plain {@link AcknowledgedRequest} that does not implement any functionality outside of the timeout getters.
26602660
*/
2661-
public static AckedRequest ackedRequest(TimeValue ackTimeout, TimeValue masterNodeTimeout) {
2662-
return new AckedRequest() {
2663-
@Override
2664-
public TimeValue ackTimeout() {
2665-
return ackTimeout;
2666-
}
2667-
2668-
@Override
2669-
public TimeValue masterNodeTimeout() {
2670-
return masterNodeTimeout;
2661+
public static AcknowledgedRequest<?> ackedRequest(TimeValue ackTimeout, TimeValue masterNodeTimeout) {
2662+
class BareAcknowledgedRequest extends AcknowledgedRequest<BareAcknowledgedRequest> {
2663+
BareAcknowledgedRequest() {
2664+
super(masterNodeTimeout, ackTimeout);
26712665
}
2672-
};
2666+
}
2667+
return new BareAcknowledgedRequest();
26732668
}
26742669

26752670
/**

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/OperationModeUpdateTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1213
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1314
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
1415
import org.elasticsearch.cluster.ClusterState;
1516
import org.elasticsearch.cluster.ClusterStateUpdateTask;
16-
import org.elasticsearch.cluster.ack.AckedRequest;
1717
import org.elasticsearch.cluster.metadata.Metadata;
1818
import org.elasticsearch.common.Priority;
1919
import org.elasticsearch.core.Nullable;
@@ -36,7 +36,7 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
3636

3737
public static AckedClusterStateUpdateTask wrap(
3838
OperationModeUpdateTask task,
39-
AckedRequest request,
39+
AcknowledgedRequest<?> request,
4040
ActionListener<AcknowledgedResponse> listener
4141
) {
4242
return new AckedClusterStateUpdateTask(task.priority(), request, listener) {

0 commit comments

Comments
 (0)