Skip to content

Commit e2869fd

Browse files
nielsbaumanomricohenn
authored andcommitted
Run TransportGetDataStreamOptionsAction on local node (elastic#125213)
This action solely needs the cluster state, it can run on any node. Additionally, it needs to be cancellable to avoid doing unnecessary work after a client failure or timeout. Relates elastic#101805
1 parent 62ffa5a commit e2869fd

File tree

5 files changed

+70
-29
lines changed

5 files changed

+70
-29
lines changed

docs/changelog/125213.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 125213
2+
summary: Run `TransportGetDataStreamOptionsAction` on local node
3+
area: Data streams
4+
type: enhancement
5+
issues: []

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.Strings;
2323
import org.elasticsearch.common.network.NetworkModule;
2424
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.datastreams.options.action.GetDataStreamOptionsAction;
2526
import org.elasticsearch.plugins.Plugin;
2627
import org.elasticsearch.rest.root.MainRestPlugin;
2728
import org.elasticsearch.test.ESIntegTestCase;
@@ -72,6 +73,13 @@ public void testGetDataStreamLifecycleCancellation() {
7273
);
7374
}
7475

76+
public void testGetDataStreamOptionsCancellation() {
77+
runRestActionCancellationTest(
78+
new Request(HttpGet.METHOD_NAME, "/_data_stream/test/_options"),
79+
GetDataStreamOptionsAction.INSTANCE.name()
80+
);
81+
}
82+
7583
private void runRestActionCancellationTest(Request request, String actionName) {
7684
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
7785

modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/GetDataStreamOptionsAction.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,18 @@
1313
import org.elasticsearch.action.ActionType;
1414
import org.elasticsearch.action.IndicesRequest;
1515
import org.elasticsearch.action.support.IndicesOptions;
16-
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
16+
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
1717
import org.elasticsearch.cluster.metadata.DataStreamOptions;
1818
import org.elasticsearch.common.collect.Iterators;
1919
import org.elasticsearch.common.io.stream.StreamInput;
2020
import org.elasticsearch.common.io.stream.StreamOutput;
2121
import org.elasticsearch.common.io.stream.Writeable;
2222
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
2323
import org.elasticsearch.core.TimeValue;
24+
import org.elasticsearch.core.UpdateForV10;
25+
import org.elasticsearch.tasks.CancellableTask;
26+
import org.elasticsearch.tasks.Task;
27+
import org.elasticsearch.tasks.TaskId;
2428
import org.elasticsearch.xcontent.ParseField;
2529
import org.elasticsearch.xcontent.ToXContent;
2630
import org.elasticsearch.xcontent.ToXContentObject;
@@ -30,6 +34,7 @@
3034
import java.util.Arrays;
3135
import java.util.Iterator;
3236
import java.util.List;
37+
import java.util.Map;
3338
import java.util.Objects;
3439

3540
/**
@@ -42,7 +47,7 @@ public class GetDataStreamOptionsAction {
4247

4348
private GetDataStreamOptionsAction() {/* no instances */}
4449

45-
public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
50+
public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable {
4651

4752
private String[] names;
4853
private IndicesOptions indicesOptions = IndicesOptions.builder()
@@ -76,21 +81,23 @@ public ActionRequestValidationException validate() {
7681
return null;
7782
}
7883

84+
@Override
85+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
86+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
87+
}
88+
89+
/**
90+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
91+
* we no longer need to support calling this action remotely.
92+
*/
93+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
7994
public Request(StreamInput in) throws IOException {
8095
super(in);
8196
this.names = in.readOptionalStringArray();
8297
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
8398
this.includeDefaults = in.readBoolean();
8499
}
85100

86-
@Override
87-
public void writeTo(StreamOutput out) throws IOException {
88-
super.writeTo(out);
89-
out.writeOptionalStringArray(names);
90-
indicesOptions.writeIndicesOptions(out);
91-
out.writeBoolean(includeDefaults);
92-
}
93-
94101
@Override
95102
public boolean equals(Object o) {
96103
if (this == o) return true;
@@ -152,10 +159,11 @@ public record DataStreamEntry(String dataStreamName, DataStreamOptions dataStrea
152159
public static final ParseField NAME_FIELD = new ParseField("name");
153160
public static final ParseField OPTIONS_FIELD = new ParseField("options");
154161

155-
DataStreamEntry(StreamInput in) throws IOException {
156-
this(in.readString(), in.readOptionalWriteable(DataStreamOptions::read));
157-
}
158-
162+
/**
163+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
164+
* we no longer need to support calling this action remotely.
165+
*/
166+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
159167
@Override
160168
public void writeTo(StreamOutput out) throws IOException {
161169
out.writeString(dataStreamName);
@@ -180,14 +188,15 @@ public Response(List<DataStreamEntry> dataStreams) {
180188
this.dataStreams = dataStreams;
181189
}
182190

183-
public Response(StreamInput in) throws IOException {
184-
this(in.readCollectionAsList(DataStreamEntry::new));
185-
}
186-
187191
public List<DataStreamEntry> getDataStreams() {
188192
return dataStreams;
189193
}
190194

195+
/**
196+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
197+
* we no longer need to support calling this action remotely.
198+
*/
199+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
191200
@Override
192201
public void writeTo(StreamOutput out) throws IOException {
193202
out.writeCollection(dataStreams);

modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportGetDataStreamOptionsAction.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
1313
import org.elasticsearch.action.support.ActionFilters;
14-
import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction;
14+
import org.elasticsearch.action.support.ChannelActionListener;
15+
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
1516
import org.elasticsearch.cluster.ProjectState;
1617
import org.elasticsearch.cluster.block.ClusterBlockException;
1718
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -20,8 +21,10 @@
2021
import org.elasticsearch.cluster.project.ProjectResolver;
2122
import org.elasticsearch.cluster.service.ClusterService;
2223
import org.elasticsearch.common.util.concurrent.EsExecutors;
24+
import org.elasticsearch.core.UpdateForV10;
2325
import org.elasticsearch.indices.SystemIndices;
2426
import org.elasticsearch.injection.guice.Inject;
27+
import org.elasticsearch.tasks.CancellableTask;
2528
import org.elasticsearch.tasks.Task;
2629
import org.elasticsearch.threadpool.ThreadPool;
2730
import org.elasticsearch.transport.TransportService;
@@ -35,13 +38,20 @@
3538
* Collects the data streams from the cluster state and then returns for each data stream its name and its
3639
* data stream options. Currently, data stream options include only the failure store configuration.
3740
*/
38-
public class TransportGetDataStreamOptionsAction extends TransportMasterNodeReadProjectAction<
41+
public class TransportGetDataStreamOptionsAction extends TransportLocalProjectMetadataAction<
3942
GetDataStreamOptionsAction.Request,
4043
GetDataStreamOptionsAction.Response> {
4144

4245
private final IndexNameExpressionResolver indexNameExpressionResolver;
4346
private final SystemIndices systemIndices;
47+
private final ThreadPool threadPool;
4448

49+
/**
50+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
51+
* we no longer need to support calling this action remotely.
52+
*/
53+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
54+
@SuppressWarnings("this-escape")
4555
@Inject
4656
public TransportGetDataStreamOptionsAction(
4757
TransportService transportService,
@@ -54,26 +64,34 @@ public TransportGetDataStreamOptionsAction(
5464
) {
5565
super(
5666
GetDataStreamOptionsAction.INSTANCE.name(),
57-
transportService,
58-
clusterService,
59-
threadPool,
6067
actionFilters,
61-
GetDataStreamOptionsAction.Request::new,
62-
projectResolver,
63-
GetDataStreamOptionsAction.Response::new,
64-
EsExecutors.DIRECT_EXECUTOR_SERVICE
68+
transportService.getTaskManager(),
69+
clusterService,
70+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
71+
projectResolver
6572
);
6673
this.indexNameExpressionResolver = indexNameExpressionResolver;
6774
this.systemIndices = systemIndices;
75+
this.threadPool = threadPool;
76+
77+
transportService.registerRequestHandler(
78+
actionName,
79+
executor,
80+
false,
81+
true,
82+
GetDataStreamOptionsAction.Request::new,
83+
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
84+
);
6885
}
6986

7087
@Override
71-
protected void masterOperation(
88+
protected void localClusterStateOperation(
7289
Task task,
7390
GetDataStreamOptionsAction.Request request,
7491
ProjectState state,
7592
ActionListener<GetDataStreamOptionsAction.Response> listener
7693
) {
94+
((CancellableTask) task).ensureNotCancelled();
7795
List<String> requestedDataStreams = DataStreamsActionUtil.getDataStreamNames(
7896
indexNameExpressionResolver,
7997
state.metadata(),

modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/rest/RestGetDataStreamOptionsAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.rest.RestUtils;
1818
import org.elasticsearch.rest.Scope;
1919
import org.elasticsearch.rest.ServerlessScope;
20+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
2021
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
2122

2223
import java.util.List;
@@ -44,7 +45,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
4445
);
4546
getDataStreamOptionsRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
4647
getDataStreamOptionsRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataStreamOptionsRequest.indicesOptions()));
47-
return channel -> client.execute(
48+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
4849
GetDataStreamOptionsAction.INSTANCE,
4950
getDataStreamOptionsRequest,
5051
new RestRefCountedChunkedToXContentListener<>(channel)

0 commit comments

Comments
 (0)