Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125213.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125213
summary: Run `TransportGetDataStreamOptionsAction` on local node
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.datastreams.options.action.GetDataStreamOptionsAction;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.root.MainRestPlugin;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -72,6 +73,13 @@ public void testGetDataStreamLifecycleCancellation() {
);
}

public void testGetDataStreamOptionsCancellation() {
runRestActionCancellationTest(
new Request(HttpGet.METHOD_NAME, "/_data_stream/test/_options"),
GetDataStreamOptionsAction.INSTANCE.name()
);
}

private void runRestActionCancellationTest(Request request, String actionName) {
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
Expand All @@ -30,6 +34,7 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -42,7 +47,7 @@ public class GetDataStreamOptionsAction {

private GetDataStreamOptionsAction() {/* no instances */}

public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable {

private String[] names;
private IndicesOptions indicesOptions = IndicesOptions.builder()
Expand Down Expand Up @@ -76,21 +81,23 @@ public ActionRequestValidationException validate() {
return null;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public Request(StreamInput in) throws IOException {
super(in);
this.names = in.readOptionalStringArray();
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
this.includeDefaults = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStringArray(names);
indicesOptions.writeIndicesOptions(out);
out.writeBoolean(includeDefaults);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -152,10 +159,11 @@ public record DataStreamEntry(String dataStreamName, DataStreamOptions dataStrea
public static final ParseField NAME_FIELD = new ParseField("name");
public static final ParseField OPTIONS_FIELD = new ParseField("options");

DataStreamEntry(StreamInput in) throws IOException {
this(in.readString(), in.readOptionalWriteable(DataStreamOptions::read));
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(dataStreamName);
Expand All @@ -180,14 +188,15 @@ public Response(List<DataStreamEntry> dataStreams) {
this.dataStreams = dataStreams;
}

public Response(StreamInput in) throws IOException {
this(in.readCollectionAsList(DataStreamEntry::new));
}

public List<DataStreamEntry> getDataStreams() {
return dataStreams;
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(dataStreams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -20,8 +21,10 @@
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -35,13 +38,20 @@
* Collects the data streams from the cluster state and then returns for each data stream its name and its
* data stream options. Currently, data stream options include only the failure store configuration.
*/
public class TransportGetDataStreamOptionsAction extends TransportMasterNodeReadProjectAction<
public class TransportGetDataStreamOptionsAction extends TransportLocalProjectMetadataAction<
GetDataStreamOptionsAction.Request,
GetDataStreamOptionsAction.Response> {

private final IndexNameExpressionResolver indexNameExpressionResolver;
private final SystemIndices systemIndices;
private final ThreadPool threadPool;

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
@Inject
public TransportGetDataStreamOptionsAction(
TransportService transportService,
Expand All @@ -54,26 +64,34 @@ public TransportGetDataStreamOptionsAction(
) {
super(
GetDataStreamOptionsAction.INSTANCE.name(),
transportService,
clusterService,
threadPool,
actionFilters,
GetDataStreamOptionsAction.Request::new,
projectResolver,
GetDataStreamOptionsAction.Response::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
transportService.getTaskManager(),
clusterService,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
projectResolver
);
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.systemIndices = systemIndices;
this.threadPool = threadPool;

transportService.registerRequestHandler(
actionName,
executor,
false,
true,
GetDataStreamOptionsAction.Request::new,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);
}

@Override
protected void masterOperation(
protected void localClusterStateOperation(
Task task,
GetDataStreamOptionsAction.Request request,
ProjectState state,
ActionListener<GetDataStreamOptionsAction.Response> listener
) {
((CancellableTask) task).ensureNotCancelled();
List<String> requestedDataStreams = DataStreamsActionUtil.getDataStreamNames(
indexNameExpressionResolver,
state.metadata(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;

import java.util.List;
Expand Down Expand Up @@ -44,7 +45,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
);
getDataStreamOptionsRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
getDataStreamOptionsRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataStreamOptionsRequest.indicesOptions()));
return channel -> client.execute(
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
GetDataStreamOptionsAction.INSTANCE,
getDataStreamOptionsRequest,
new RestRefCountedChunkedToXContentListener<>(channel)
Expand Down
Loading