Skip to content

Commit 6ff138f

Browse files
authored
Drop useless AckedRequest interface (#113255)
Almost every implementation of `AckedRequest` is an `AcknowledgedRequest` too, and the distinction is rather confusing. Moreover the other implementations of `AckedRequest` are a potential source of `null` timeouts that we'd like to get rid of. This commit simplifies the situation by dropping the unnecessary `AckedRequest` interface entirely.
1 parent 5530caa commit 6ff138f

File tree

11 files changed

+86
-113
lines changed

11 files changed

+86
-113
lines changed

qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/RestClusterInfoActionCancellationIT.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.client.Response;
2020
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
2121
import org.elasticsearch.cluster.ClusterState;
22-
import org.elasticsearch.cluster.ack.AckedRequest;
2322
import org.elasticsearch.cluster.block.ClusterBlock;
2423
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2524
import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -102,22 +101,9 @@ private void runTest(String actionName, String endpoint) throws Exception {
102101

103102
private void updateClusterState(Function<ClusterState, ClusterState> transformationFn) {
104103
final TimeValue timeout = TimeValue.timeValueSeconds(10);
105-
106-
final AckedRequest ackedRequest = new AckedRequest() {
107-
@Override
108-
public TimeValue ackTimeout() {
109-
return timeout;
110-
}
111-
112-
@Override
113-
public TimeValue masterNodeTimeout() {
114-
return timeout;
115-
}
116-
};
117-
118104
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
119105
internalCluster().getAnyMasterNodeInstance(ClusterService.class)
120-
.submitUnbatchedStateUpdateTask("get_mappings_cancellation_test", new AckedClusterStateUpdateTask(ackedRequest, future) {
106+
.submitUnbatchedStateUpdateTask("get_mappings_cancellation_test", new AckedClusterStateUpdateTask(timeout, timeout, future) {
121107
@Override
122108
public ClusterState execute(ClusterState currentState) throws Exception {
123109
return transformationFn.apply(currentState);

server/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.elasticsearch.action.support.master;
1010

1111
import org.elasticsearch.action.ActionRequestValidationException;
12-
import org.elasticsearch.cluster.ack.AckedRequest;
1312
import org.elasticsearch.common.io.stream.StreamInput;
1413
import org.elasticsearch.common.io.stream.StreamOutput;
1514
import org.elasticsearch.core.TimeValue;
@@ -23,9 +22,7 @@
2322
* Abstract base class for action requests that track acknowledgements of cluster state updates: such a request is acknowledged only once
2423
* the cluster state update is committed and all relevant nodes have applied it and acknowledged its application to the elected master..
2524
*/
26-
public abstract class AcknowledgedRequest<Request extends MasterNodeRequest<Request>> extends MasterNodeRequest<Request>
27-
implements
28-
AckedRequest {
25+
public abstract class AcknowledgedRequest<Request extends MasterNodeRequest<Request>> extends MasterNodeRequest<Request> {
2926

3027
public static final TimeValue DEFAULT_ACK_TIMEOUT = timeValueSeconds(30);
3128

@@ -74,7 +71,6 @@ public final Request ackTimeout(TimeValue ackTimeout) {
7471
/**
7572
* @return the current ack timeout as a {@link TimeValue}
7673
*/
77-
@Override
7874
public final TimeValue ackTimeout() {
7975
return ackTimeout;
8076
}

server/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
package org.elasticsearch.cluster;
1010

1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1213
import org.elasticsearch.action.support.master.AcknowledgedResponse;
13-
import org.elasticsearch.cluster.ack.AckedRequest;
1414
import org.elasticsearch.cluster.node.DiscoveryNode;
1515
import org.elasticsearch.common.Priority;
1616
import org.elasticsearch.core.TimeValue;
@@ -23,21 +23,38 @@
2323
public abstract class AckedClusterStateUpdateTask extends ClusterStateUpdateTask implements ClusterStateAckListener {
2424

2525
private final ActionListener<AcknowledgedResponse> listener;
26-
private final AckedRequest request;
26+
private final TimeValue ackTimeout;
2727

28-
protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<? extends AcknowledgedResponse> listener) {
29-
this(Priority.NORMAL, request, listener);
28+
protected AckedClusterStateUpdateTask(AcknowledgedRequest<?> request, ActionListener<? extends AcknowledgedResponse> listener) {
29+
this(Priority.NORMAL, request.masterNodeTimeout(), request.ackTimeout(), listener);
30+
}
31+
32+
protected AckedClusterStateUpdateTask(
33+
TimeValue masterNodeTimeout,
34+
TimeValue ackTimeout,
35+
ActionListener<? extends AcknowledgedResponse> listener
36+
) {
37+
this(Priority.NORMAL, masterNodeTimeout, ackTimeout, listener);
38+
}
39+
40+
protected AckedClusterStateUpdateTask(
41+
Priority priority,
42+
AcknowledgedRequest<?> request,
43+
ActionListener<? extends AcknowledgedResponse> listener
44+
) {
45+
this(priority, request.masterNodeTimeout(), request.ackTimeout(), listener);
3046
}
3147

3248
@SuppressWarnings("unchecked")
3349
protected AckedClusterStateUpdateTask(
3450
Priority priority,
35-
AckedRequest request,
51+
TimeValue masterNodeTimeout,
52+
TimeValue ackTimeout,
3653
ActionListener<? extends AcknowledgedResponse> listener
3754
) {
38-
super(priority, request.masterNodeTimeout());
55+
super(priority, masterNodeTimeout);
3956
this.listener = (ActionListener<AcknowledgedResponse>) listener;
40-
this.request = request;
57+
this.ackTimeout = ackTimeout;
4158
}
4259

4360
/**
@@ -81,6 +98,6 @@ public void onFailure(Exception e) {
8198
* Acknowledgement timeout, maximum time interval to wait for acknowledgements
8299
*/
83100
public final TimeValue ackTimeout() {
84-
return request.ackTimeout();
101+
return ackTimeout;
85102
}
86103
}

server/src/main/java/org/elasticsearch/cluster/ack/AckedRequest.java

Lines changed: 0 additions & 28 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/cluster/ack/ClusterStateUpdateRequest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@
1515
* Base class to be used when needing to update the cluster state
1616
* Contains the basic fields that are always needed
1717
*/
18-
public abstract class ClusterStateUpdateRequest<T extends ClusterStateUpdateRequest<T>> implements AckedRequest {
18+
public abstract class ClusterStateUpdateRequest<T extends ClusterStateUpdateRequest<T>> {
1919

2020
private TimeValue ackTimeout;
2121
private TimeValue masterNodeTimeout;
2222

2323
/**
2424
* Returns the maximum time interval to wait for acknowledgements
2525
*/
26-
@Override
2726
public TimeValue ackTimeout() {
2827
return ackTimeout;
2928
}
@@ -41,7 +40,6 @@ public T ackTimeout(TimeValue ackTimeout) {
4140
* Returns the maximum time interval to wait for the request to
4241
* be completed on the master node
4342
*/
44-
@Override
4543
public TimeValue masterNodeTimeout() {
4644
return masterNodeTimeout;
4745
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,12 @@ public void createDataStream(CreateDataStreamClusterStateUpdateRequest request,
9999
var delegate = new AllocationActionListener<>(listener, threadPool.getThreadContext());
100100
submitUnbatchedTask(
101101
"create-data-stream [" + request.name + "]",
102-
new AckedClusterStateUpdateTask(Priority.HIGH, request, delegate.clusterStateUpdate()) {
102+
new AckedClusterStateUpdateTask(
103+
Priority.HIGH,
104+
request.masterNodeTimeout(),
105+
request.ackTimeout(),
106+
delegate.clusterStateUpdate()
107+
) {
103108
@Override
104109
public ClusterState execute(ClusterState currentState) throws Exception {
105110
// When we're manually creating a data stream (i.e. not an auto creation), we don't need to initialize the failure store

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,12 @@ private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request,
296296
var delegate = new AllocationActionListener<>(listener, threadPool.getThreadContext());
297297
submitUnbatchedTask(
298298
"create-index [" + request.index() + "], cause [" + request.cause() + "]",
299-
new AckedClusterStateUpdateTask(Priority.URGENT, request, delegate.clusterStateUpdate()) {
299+
new AckedClusterStateUpdateTask(
300+
Priority.URGENT,
301+
request.masterNodeTimeout(),
302+
request.ackTimeout(),
303+
delegate.clusterStateUpdate()
304+
) {
300305

301306
@Override
302307
public ClusterState execute(ClusterState currentState) throws Exception {

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,12 @@ public void migrateToDataStream(
105105
var delegate = new AllocationActionListener<>(listener, threadContext);
106106
submitUnbatchedTask(
107107
"migrate-to-data-stream [" + request.aliasName + "]",
108-
new AckedClusterStateUpdateTask(Priority.HIGH, request, delegate.clusterStateUpdate()) {
108+
new AckedClusterStateUpdateTask(
109+
Priority.HIGH,
110+
request.masterNodeTimeout(),
111+
request.ackTimeout(),
112+
delegate.clusterStateUpdate()
113+
) {
109114

110115
@Override
111116
public ClusterState execute(ClusterState currentState) throws Exception {

server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.ElasticsearchException;
1616
import org.elasticsearch.action.ActionListener;
1717
import org.elasticsearch.action.support.PlainActionFuture;
18+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1819
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1920
import org.elasticsearch.action.support.master.MasterNodeRequest;
2021
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@@ -27,7 +28,6 @@
2728
import org.elasticsearch.cluster.LocalMasterServiceTask;
2829
import org.elasticsearch.cluster.NotMasterException;
2930
import org.elasticsearch.cluster.SimpleBatchedExecutor;
30-
import org.elasticsearch.cluster.ack.AckedRequest;
3131
import org.elasticsearch.cluster.block.ClusterBlocks;
3232
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
3333
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
@@ -1571,7 +1571,7 @@ public void onAckFailure(Exception e) {
15711571

15721572
masterService.submitUnbatchedStateUpdateTask(
15731573
"test2",
1574-
new AckedClusterStateUpdateTask(ackedRequest(TimeValue.ZERO, null), null) {
1574+
new AckedClusterStateUpdateTask(ackedRequest(TimeValue.ZERO, MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT), null) {
15751575
@Override
15761576
public ClusterState execute(ClusterState currentState) {
15771577
return ClusterState.builder(currentState).build();
@@ -1623,7 +1623,7 @@ public void onAckTimeout() {
16231623

16241624
masterService.submitUnbatchedStateUpdateTask(
16251625
"test2",
1626-
new AckedClusterStateUpdateTask(ackedRequest(ackTimeout, null), null) {
1626+
new AckedClusterStateUpdateTask(ackedRequest(ackTimeout, MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT), null) {
16271627
@Override
16281628
public ClusterState execute(ClusterState currentState) {
16291629
threadPool.getThreadContext().addResponseHeader(responseHeaderName, responseHeaderValue);
@@ -1678,7 +1678,10 @@ public void onAckTimeout() {
16781678

16791679
masterService.submitUnbatchedStateUpdateTask(
16801680
"test2",
1681-
new AckedClusterStateUpdateTask(ackedRequest(MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, null), null) {
1681+
new AckedClusterStateUpdateTask(
1682+
ackedRequest(MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT),
1683+
null
1684+
) {
16821685
@Override
16831686
public ClusterState execute(ClusterState currentState) {
16841687
return ClusterState.builder(currentState).build();
@@ -2657,20 +2660,15 @@ public static ClusterState discoveryState(MasterService masterService) {
26572660
}
26582661

26592662
/**
2660-
* Returns a plain {@link AckedRequest} that does not implement any functionality outside of the timeout getters.
2663+
* Returns a plain {@link AcknowledgedRequest} that does not implement any functionality outside of the timeout getters.
26612664
*/
2662-
public static AckedRequest ackedRequest(TimeValue ackTimeout, TimeValue masterNodeTimeout) {
2663-
return new AckedRequest() {
2664-
@Override
2665-
public TimeValue ackTimeout() {
2666-
return ackTimeout;
2667-
}
2668-
2669-
@Override
2670-
public TimeValue masterNodeTimeout() {
2671-
return masterNodeTimeout;
2665+
public static AcknowledgedRequest<?> ackedRequest(TimeValue ackTimeout, TimeValue masterNodeTimeout) {
2666+
class BareAcknowledgedRequest extends AcknowledgedRequest<BareAcknowledgedRequest> {
2667+
BareAcknowledgedRequest() {
2668+
super(masterNodeTimeout, ackTimeout);
26722669
}
2673-
};
2670+
}
2671+
return new BareAcknowledgedRequest();
26742672
}
26752673

26762674
/**

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/OperationModeUpdateTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1213
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1314
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
1415
import org.elasticsearch.cluster.ClusterState;
1516
import org.elasticsearch.cluster.ClusterStateUpdateTask;
16-
import org.elasticsearch.cluster.ack.AckedRequest;
1717
import org.elasticsearch.cluster.metadata.Metadata;
1818
import org.elasticsearch.common.Priority;
1919
import org.elasticsearch.core.Nullable;
@@ -36,7 +36,7 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
3636

3737
public static AckedClusterStateUpdateTask wrap(
3838
OperationModeUpdateTask task,
39-
AckedRequest request,
39+
AcknowledgedRequest<?> request,
4040
ActionListener<AcknowledgedResponse> listener
4141
) {
4242
return new AckedClusterStateUpdateTask(task.priority(), request, listener) {

0 commit comments

Comments
 (0)