Skip to content

Commit 89867d5

Browse files
committed
Run TransportGetDataStreamOptionsAction on local node
This action solely needs the cluster state, it can run on any node. Additionally, it needs to be cancellable to avoid doing unnecessary work after a client failure or timeout. Relates #101805
1 parent 8a741bf commit 89867d5

File tree

4 files changed

+65
-29
lines changed

4 files changed

+65
-29
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.Strings;
2222
import org.elasticsearch.common.network.NetworkModule;
2323
import org.elasticsearch.common.settings.Settings;
24+
import org.elasticsearch.datastreams.options.action.GetDataStreamOptionsAction;
2425
import org.elasticsearch.plugins.Plugin;
2526
import org.elasticsearch.rest.root.MainRestPlugin;
2627
import org.elasticsearch.test.ESIntegTestCase;
@@ -64,6 +65,13 @@ public void testGetDataStreamCancellation() {
6465
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream?verbose"), GetDataStreamAction.NAME);
6566
}
6667

68+
public void testGetDataStreamOptionsCancellation() {
69+
runRestActionCancellationTest(
70+
new Request(HttpGet.METHOD_NAME, "/_data_stream/test/_options"),
71+
GetDataStreamOptionsAction.INSTANCE.name()
72+
);
73+
}
74+
6775
private void runRestActionCancellationTest(Request request, String actionName) {
6876
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
6977

modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/GetDataStreamOptionsAction.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,18 @@
1313
import org.elasticsearch.action.ActionType;
1414
import org.elasticsearch.action.IndicesRequest;
1515
import org.elasticsearch.action.support.IndicesOptions;
16-
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
16+
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
1717
import org.elasticsearch.cluster.metadata.DataStreamOptions;
1818
import org.elasticsearch.common.collect.Iterators;
1919
import org.elasticsearch.common.io.stream.StreamInput;
2020
import org.elasticsearch.common.io.stream.StreamOutput;
2121
import org.elasticsearch.common.io.stream.Writeable;
2222
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
2323
import org.elasticsearch.core.TimeValue;
24+
import org.elasticsearch.core.UpdateForV10;
25+
import org.elasticsearch.tasks.CancellableTask;
26+
import org.elasticsearch.tasks.Task;
27+
import org.elasticsearch.tasks.TaskId;
2428
import org.elasticsearch.xcontent.ParseField;
2529
import org.elasticsearch.xcontent.ToXContent;
2630
import org.elasticsearch.xcontent.ToXContentObject;
@@ -30,6 +34,7 @@
3034
import java.util.Arrays;
3135
import java.util.Iterator;
3236
import java.util.List;
37+
import java.util.Map;
3338
import java.util.Objects;
3439

3540
/**
@@ -42,7 +47,7 @@ public class GetDataStreamOptionsAction {
4247

4348
private GetDataStreamOptionsAction() {/* no instances */}
4449

45-
public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
50+
public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable {
4651

4752
private String[] names;
4853
private IndicesOptions indicesOptions = IndicesOptions.builder()
@@ -76,21 +81,23 @@ public ActionRequestValidationException validate() {
7681
return null;
7782
}
7883

84+
@Override
85+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
86+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
87+
}
88+
89+
/**
90+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
91+
* we no longer need to support calling this action remotely.
92+
*/
93+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
7994
public Request(StreamInput in) throws IOException {
8095
super(in);
8196
this.names = in.readOptionalStringArray();
8297
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
8398
this.includeDefaults = in.readBoolean();
8499
}
85100

86-
@Override
87-
public void writeTo(StreamOutput out) throws IOException {
88-
super.writeTo(out);
89-
out.writeOptionalStringArray(names);
90-
indicesOptions.writeIndicesOptions(out);
91-
out.writeBoolean(includeDefaults);
92-
}
93-
94101
@Override
95102
public boolean equals(Object o) {
96103
if (this == o) return true;
@@ -152,10 +159,11 @@ public record DataStreamEntry(String dataStreamName, DataStreamOptions dataStrea
152159
public static final ParseField NAME_FIELD = new ParseField("name");
153160
public static final ParseField OPTIONS_FIELD = new ParseField("options");
154161

155-
DataStreamEntry(StreamInput in) throws IOException {
156-
this(in.readString(), in.readOptionalWriteable(DataStreamOptions::read));
157-
}
158-
162+
/**
163+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
164+
* we no longer need to support calling this action remotely.
165+
*/
166+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
159167
@Override
160168
public void writeTo(StreamOutput out) throws IOException {
161169
out.writeString(dataStreamName);
@@ -180,14 +188,15 @@ public Response(List<DataStreamEntry> dataStreams) {
180188
this.dataStreams = dataStreams;
181189
}
182190

183-
public Response(StreamInput in) throws IOException {
184-
this(in.readCollectionAsList(DataStreamEntry::new));
185-
}
186-
187191
public List<DataStreamEntry> getDataStreams() {
188192
return dataStreams;
189193
}
190194

195+
/**
196+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
197+
* we no longer need to support calling this action remotely.
198+
*/
199+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
191200
@Override
192201
public void writeTo(StreamOutput out) throws IOException {
193202
out.writeCollection(dataStreams);

modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportGetDataStreamOptionsAction.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
1313
import org.elasticsearch.action.support.ActionFilters;
14-
import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction;
14+
import org.elasticsearch.action.support.ChannelActionListener;
15+
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
1516
import org.elasticsearch.cluster.ProjectState;
1617
import org.elasticsearch.cluster.block.ClusterBlockException;
1718
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -20,8 +21,10 @@
2021
import org.elasticsearch.cluster.project.ProjectResolver;
2122
import org.elasticsearch.cluster.service.ClusterService;
2223
import org.elasticsearch.common.util.concurrent.EsExecutors;
24+
import org.elasticsearch.core.UpdateForV10;
2325
import org.elasticsearch.indices.SystemIndices;
2426
import org.elasticsearch.injection.guice.Inject;
27+
import org.elasticsearch.tasks.CancellableTask;
2528
import org.elasticsearch.tasks.Task;
2629
import org.elasticsearch.threadpool.ThreadPool;
2730
import org.elasticsearch.transport.TransportService;
@@ -35,13 +38,20 @@
3538
* Collects the data streams from the cluster state and then returns for each data stream its name and its
3639
* data stream options. Currently, data stream options include only the failure store configuration.
3740
*/
38-
public class TransportGetDataStreamOptionsAction extends TransportMasterNodeReadProjectAction<
41+
public class TransportGetDataStreamOptionsAction extends TransportLocalProjectMetadataAction<
3942
GetDataStreamOptionsAction.Request,
4043
GetDataStreamOptionsAction.Response> {
4144

4245
private final IndexNameExpressionResolver indexNameExpressionResolver;
4346
private final SystemIndices systemIndices;
47+
private final ThreadPool threadPool;
4448

49+
/**
50+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
51+
* we no longer need to support calling this action remotely.
52+
*/
53+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
54+
@SuppressWarnings("this-escape")
4555
@Inject
4656
public TransportGetDataStreamOptionsAction(
4757
TransportService transportService,
@@ -54,26 +64,34 @@ public TransportGetDataStreamOptionsAction(
5464
) {
5565
super(
5666
GetDataStreamOptionsAction.INSTANCE.name(),
57-
transportService,
58-
clusterService,
59-
threadPool,
6067
actionFilters,
61-
GetDataStreamOptionsAction.Request::new,
62-
projectResolver,
63-
GetDataStreamOptionsAction.Response::new,
64-
EsExecutors.DIRECT_EXECUTOR_SERVICE
68+
transportService.getTaskManager(),
69+
clusterService,
70+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
71+
projectResolver
6572
);
6673
this.indexNameExpressionResolver = indexNameExpressionResolver;
6774
this.systemIndices = systemIndices;
75+
this.threadPool = threadPool;
76+
77+
transportService.registerRequestHandler(
78+
actionName,
79+
executor,
80+
false,
81+
true,
82+
GetDataStreamOptionsAction.Request::new,
83+
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
84+
);
6885
}
6986

7087
@Override
71-
protected void masterOperation(
88+
protected void localClusterStateOperation(
7289
Task task,
7390
GetDataStreamOptionsAction.Request request,
7491
ProjectState state,
7592
ActionListener<GetDataStreamOptionsAction.Response> listener
7693
) {
94+
((CancellableTask) task).ensureNotCancelled();
7795
List<String> requestedDataStreams = DataStreamsActionUtil.getDataStreamNames(
7896
indexNameExpressionResolver,
7997
state.metadata(),

modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/rest/RestGetDataStreamOptionsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.rest.RestUtils;
1818
import org.elasticsearch.rest.Scope;
1919
import org.elasticsearch.rest.ServerlessScope;
20+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
2021
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
2122

2223
import java.util.List;
@@ -44,7 +45,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
4445
);
4546
getDataStreamOptionsRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
4647
getDataStreamOptionsRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataStreamOptionsRequest.indicesOptions()));
47-
return channel -> client.execute(
48+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
4849
GetDataStreamOptionsAction.INSTANCE,
4950
getDataStreamOptionsRequest,
5051
new RestRefCountedChunkedToXContentListener<>(channel)

0 commit comments

Comments
 (0)