Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/125214.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125214
summary: Run `TransportGetDataStreamLifecycleAction` on local node
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
import org.elasticsearch.action.support.CancellableActionTestPlugin;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
Expand Down Expand Up @@ -64,6 +65,13 @@ public void testGetDataStreamCancellation() {
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream?verbose"), GetDataStreamAction.NAME);
}

public void testGetDataStreamLifecycleCancellation() {
runRestActionCancellationTest(
new Request(HttpGet.METHOD_NAME, "/_data_stream/test/_lifecycle"),
GetDataStreamLifecycleAction.INSTANCE.name()
);
}

private void runRestActionCancellationTest(Request request, String actionName) {
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -24,9 +25,10 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Comparator;
Expand All @@ -38,41 +40,52 @@
* Collects the data streams from the cluster state, filters the ones that do not have a data stream lifecycle configured and then returns
* a list of the data stream name and respective lifecycle configuration.
*/
public class TransportGetDataStreamLifecycleAction extends TransportMasterNodeReadProjectAction<
public class TransportGetDataStreamLifecycleAction extends TransportLocalProjectMetadataAction<
GetDataStreamLifecycleAction.Request,
GetDataStreamLifecycleAction.Response> {
private final ClusterSettings clusterSettings;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final DataStreamGlobalRetentionSettings globalRetentionSettings;

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
@Inject
public TransportGetDataStreamLifecycleAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
ProjectResolver projectResolver,
IndexNameExpressionResolver indexNameExpressionResolver,
DataStreamGlobalRetentionSettings globalRetentionSettings
) {
super(
GetDataStreamLifecycleAction.INSTANCE.name(),
transportService,
clusterService,
threadPool,
actionFilters,
GetDataStreamLifecycleAction.Request::new,
projectResolver,
GetDataStreamLifecycleAction.Response::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
transportService.getTaskManager(),
clusterService,
EsExecutors.DIRECT_EXECUTOR_SERVICE,
projectResolver
);
clusterSettings = clusterService.getClusterSettings();
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.globalRetentionSettings = globalRetentionSettings;

transportService.registerRequestHandler(
actionName,
executor,
false,
true,
GetDataStreamLifecycleAction.Request::new,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);
}

@Override
protected void masterOperation(
protected void localClusterStateOperation(
Task task,
GetDataStreamLifecycleAction.Request request,
ProjectState state,
Expand All @@ -86,6 +99,7 @@ protected void masterOperation(
);
Map<String, DataStream> dataStreams = state.metadata().dataStreams();

((CancellableTask) task).ensureNotCancelled();
listener.onResponse(
new GetDataStreamLifecycleAction.Response(
results.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;

import java.util.List;
Expand Down Expand Up @@ -46,7 +47,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
);
getDataLifecycleRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
getDataLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataLifecycleRequest.indicesOptions()));
return channel -> client.execute(
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
GetDataStreamLifecycleAction.INSTANCE,
getDataLifecycleRequest,
new RestRefCountedChunkedToXContentListener<>(channel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -24,6 +24,10 @@
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
Expand All @@ -33,6 +37,7 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -44,7 +49,7 @@ public class GetDataStreamLifecycleAction {

private GetDataStreamLifecycleAction() {/* no instances */}

public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable {

private String[] names;
private IndicesOptions indicesOptions = IndicesOptions.builder()
Expand Down Expand Up @@ -89,21 +94,23 @@ public ActionRequestValidationException validate() {
return null;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public Request(StreamInput in) throws IOException {
super(in);
this.names = in.readOptionalStringArray();
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
this.includeDefaults = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStringArray(names);
indicesOptions.writeIndicesOptions(out);
out.writeBoolean(includeDefaults);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -169,14 +176,11 @@ public record DataStreamLifecycle(
public static final ParseField NAME_FIELD = new ParseField("name");
public static final ParseField LIFECYCLE_FIELD = new ParseField("lifecycle");

DataStreamLifecycle(StreamInput in) throws IOException {
this(
in.readString(),
in.readOptionalWriteable(org.elasticsearch.cluster.metadata.DataStreamLifecycle::new),
in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) && in.readBoolean()
);
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(dataStreamName);
Expand Down Expand Up @@ -238,16 +242,6 @@ public Response(
this.globalRetention = globalRetention;
}

public Response(StreamInput in) throws IOException {
this(
in.readCollectionAsList(DataStreamLifecycle::new),
in.readOptionalWriteable(RolloverConfiguration::new),
in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)
? in.readOptionalWriteable(DataStreamGlobalRetention::read)
: null
);
}

public List<DataStreamLifecycle> getDataStreamLifecycles() {
return dataStreamLifecycles;
}
Expand All @@ -261,6 +255,11 @@ public DataStreamGlobalRetention getGlobalRetention() {
return globalRetention;
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(dataStreamLifecycles);
Expand Down
Loading