From 89867d5219eda6a6f0d70615e70b921aaf603157 Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Sat, 15 Feb 2025 18:10:59 +1000 Subject: [PATCH 1/2] Run `TransportGetDataStreamOptionsAction` on local node This action solely needs the cluster state, it can run on any node. Additionally, it needs to be cancellable to avoid doing unnecessary work after a client failure or timeout. Relates #101805 --- .../DataStreamRestActionCancellationIT.java | 8 ++++ .../action/GetDataStreamOptionsAction.java | 45 +++++++++++-------- .../TransportGetDataStreamOptionsAction.java | 38 +++++++++++----- .../rest/RestGetDataStreamOptionsAction.java | 3 +- 4 files changed, 65 insertions(+), 29 deletions(-) diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java index 55835143fdca9..47ad93fe42407 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.datastreams.options.action.GetDataStreamOptionsAction; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.root.MainRestPlugin; import org.elasticsearch.test.ESIntegTestCase; @@ -64,6 +65,13 @@ public void testGetDataStreamCancellation() { runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream?verbose"), GetDataStreamAction.NAME); } + public void testGetDataStreamOptionsCancellation() { + runRestActionCancellationTest( + new Request(HttpGet.METHOD_NAME, "/_data_stream/test/_options"), + GetDataStreamOptionsAction.INSTANCE.name() + ); + } + private void runRestActionCancellationTest(Request request, String actionName) { final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/GetDataStreamOptionsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/GetDataStreamOptionsAction.java index 45bda1abd5c02..b55a3a08617ce 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/GetDataStreamOptionsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/GetDataStreamOptionsAction.java @@ -13,7 +13,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.local.LocalClusterStateRequest; import org.elasticsearch.cluster.metadata.DataStreamOptions; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; @@ -21,6 +21,10 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ChunkedToXContentObject; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.UpdateForV10; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContentObject; @@ -30,6 +34,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -42,7 +47,7 @@ public class GetDataStreamOptionsAction { private GetDataStreamOptionsAction() {/* no instances */} - public static class Request extends MasterNodeReadRequest implements IndicesRequest.Replaceable { + public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable { private String[] names; private IndicesOptions indicesOptions = IndicesOptions.builder() @@ -76,6 +81,16 @@ public ActionRequestValidationException validate() { return null; } + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) public Request(StreamInput in) throws IOException { super(in); this.names = in.readOptionalStringArray(); @@ -83,14 +98,6 @@ public Request(StreamInput in) throws IOException { this.includeDefaults = in.readBoolean(); } - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeOptionalStringArray(names); - indicesOptions.writeIndicesOptions(out); - out.writeBoolean(includeDefaults); - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -152,10 +159,11 @@ public record DataStreamEntry(String dataStreamName, DataStreamOptions dataStrea public static final ParseField NAME_FIELD = new ParseField("name"); public static final ParseField OPTIONS_FIELD = new ParseField("options"); - DataStreamEntry(StreamInput in) throws IOException { - this(in.readString(), in.readOptionalWriteable(DataStreamOptions::read)); - } - + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(dataStreamName); @@ -180,14 +188,15 @@ public Response(List dataStreams) { this.dataStreams = dataStreams; } - public Response(StreamInput in) throws IOException { - this(in.readCollectionAsList(DataStreamEntry::new)); - } - public List getDataStreams() { return dataStreams; } + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) @Override public void writeTo(StreamOutput out) throws IOException { out.writeCollection(dataStreams); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportGetDataStreamOptionsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportGetDataStreamOptionsAction.java index c5f6317caca8e..3bb0907c5f43d 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportGetDataStreamOptionsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/action/TransportGetDataStreamOptionsAction.java @@ -11,7 +11,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.datastreams.DataStreamsActionUtil; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction; +import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -20,8 +21,10 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -35,13 +38,20 @@ * Collects the data streams from the cluster state and then returns for each data stream its name and its * data stream options. Currently, data stream options include only the failure store configuration. */ -public class TransportGetDataStreamOptionsAction extends TransportMasterNodeReadProjectAction< +public class TransportGetDataStreamOptionsAction extends TransportLocalProjectMetadataAction< GetDataStreamOptionsAction.Request, GetDataStreamOptionsAction.Response> { private final IndexNameExpressionResolver indexNameExpressionResolver; private final SystemIndices systemIndices; + private final ThreadPool threadPool; + /** + * NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until + * we no longer need to support calling this action remotely. + */ + @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) + @SuppressWarnings("this-escape") @Inject public TransportGetDataStreamOptionsAction( TransportService transportService, @@ -54,26 +64,34 @@ public TransportGetDataStreamOptionsAction( ) { super( GetDataStreamOptionsAction.INSTANCE.name(), - transportService, - clusterService, - threadPool, actionFilters, - GetDataStreamOptionsAction.Request::new, - projectResolver, - GetDataStreamOptionsAction.Response::new, - EsExecutors.DIRECT_EXECUTOR_SERVICE + transportService.getTaskManager(), + clusterService, + EsExecutors.DIRECT_EXECUTOR_SERVICE, + projectResolver ); this.indexNameExpressionResolver = indexNameExpressionResolver; this.systemIndices = systemIndices; + this.threadPool = threadPool; + + transportService.registerRequestHandler( + actionName, + executor, + false, + true, + GetDataStreamOptionsAction.Request::new, + (request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel)) + ); } @Override - protected void masterOperation( + protected void localClusterStateOperation( Task task, GetDataStreamOptionsAction.Request request, ProjectState state, ActionListener listener ) { + ((CancellableTask) task).ensureNotCancelled(); List requestedDataStreams = DataStreamsActionUtil.getDataStreamNames( indexNameExpressionResolver, state.metadata(), diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/rest/RestGetDataStreamOptionsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/rest/RestGetDataStreamOptionsAction.java index 6d6530efce1b9..c47282d4d316f 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/rest/RestGetDataStreamOptionsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/options/rest/RestGetDataStreamOptionsAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener; import java.util.List; @@ -44,7 +45,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli ); getDataStreamOptionsRequest.includeDefaults(request.paramAsBoolean("include_defaults", false)); getDataStreamOptionsRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataStreamOptionsRequest.indicesOptions())); - return channel -> client.execute( + return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( GetDataStreamOptionsAction.INSTANCE, getDataStreamOptionsRequest, new RestRefCountedChunkedToXContentListener<>(channel) From 28a27764aa47eb578c11cb71acdccd7224736b6c Mon Sep 17 00:00:00 2001 From: Niels Bauman <33722607+nielsbauman@users.noreply.github.com> Date: Wed, 19 Mar 2025 13:50:17 +0100 Subject: [PATCH 2/2] Update docs/changelog/125213.yaml --- docs/changelog/125213.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/125213.yaml diff --git a/docs/changelog/125213.yaml b/docs/changelog/125213.yaml new file mode 100644 index 0000000000000..3793e83d1e162 --- /dev/null +++ b/docs/changelog/125213.yaml @@ -0,0 +1,5 @@ +pr: 125213 +summary: Run `TransportGetDataStreamOptionsAction` on local node +area: Data streams +type: enhancement +issues: []