Skip to content

Commit 62ffa5a

Browse files
nielsbaumanomricohenn
authored andcommitted
Run TransportGetDataStreamLifecycleAction on local node (elastic#125214)
This action solely needs the cluster state, it can run on any node. Additionally, it needs to be cancellable to avoid doing unnecessary work after a client failure or timeout. Relates elastic#101805
1 parent 44817b6 commit 62ffa5a

File tree

5 files changed

+68
-41
lines changed

5 files changed

+68
-41
lines changed

docs/changelog/125214.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 125214
2+
summary: Run `TransportGetDataStreamLifecycleAction` on local node
3+
area: Data streams
4+
type: enhancement
5+
issues: []

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.http.client.methods.HttpGet;
1313
import org.apache.http.client.methods.HttpPost;
1414
import org.elasticsearch.action.datastreams.GetDataStreamAction;
15+
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
1516
import org.elasticsearch.action.support.CancellableActionTestPlugin;
1617
import org.elasticsearch.action.support.PlainActionFuture;
1718
import org.elasticsearch.action.support.RefCountingListener;
@@ -64,6 +65,13 @@ public void testGetDataStreamCancellation() {
6465
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream?verbose"), GetDataStreamAction.NAME);
6566
}
6667

68+
public void testGetDataStreamLifecycleCancellation() {
69+
runRestActionCancellationTest(
70+
new Request(HttpGet.METHOD_NAME, "/_data_stream/test/_lifecycle"),
71+
GetDataStreamLifecycleAction.INSTANCE.name()
72+
);
73+
}
74+
6775
private void runRestActionCancellationTest(Request request, String actionName) {
6876
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
6977

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
1313
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
1414
import org.elasticsearch.action.support.ActionFilters;
15-
import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction;
15+
import org.elasticsearch.action.support.ChannelActionListener;
16+
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
1617
import org.elasticsearch.cluster.ProjectState;
1718
import org.elasticsearch.cluster.block.ClusterBlockException;
1819
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -24,9 +25,10 @@
2425
import org.elasticsearch.cluster.service.ClusterService;
2526
import org.elasticsearch.common.settings.ClusterSettings;
2627
import org.elasticsearch.common.util.concurrent.EsExecutors;
28+
import org.elasticsearch.core.UpdateForV10;
2729
import org.elasticsearch.injection.guice.Inject;
30+
import org.elasticsearch.tasks.CancellableTask;
2831
import org.elasticsearch.tasks.Task;
29-
import org.elasticsearch.threadpool.ThreadPool;
3032
import org.elasticsearch.transport.TransportService;
3133

3234
import java.util.Comparator;
@@ -38,41 +40,52 @@
3840
* Collects the data streams from the cluster state, filters the ones that do not have a data stream lifecycle configured and then returns
3941
* a list of the data stream name and respective lifecycle configuration.
4042
*/
41-
public class TransportGetDataStreamLifecycleAction extends TransportMasterNodeReadProjectAction<
43+
public class TransportGetDataStreamLifecycleAction extends TransportLocalProjectMetadataAction<
4244
GetDataStreamLifecycleAction.Request,
4345
GetDataStreamLifecycleAction.Response> {
4446
private final ClusterSettings clusterSettings;
4547
private final IndexNameExpressionResolver indexNameExpressionResolver;
4648
private final DataStreamGlobalRetentionSettings globalRetentionSettings;
4749

50+
/**
51+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
52+
* we no longer need to support calling this action remotely.
53+
*/
54+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
55+
@SuppressWarnings("this-escape")
4856
@Inject
4957
public TransportGetDataStreamLifecycleAction(
5058
TransportService transportService,
5159
ClusterService clusterService,
52-
ThreadPool threadPool,
5360
ActionFilters actionFilters,
5461
ProjectResolver projectResolver,
5562
IndexNameExpressionResolver indexNameExpressionResolver,
5663
DataStreamGlobalRetentionSettings globalRetentionSettings
5764
) {
5865
super(
5966
GetDataStreamLifecycleAction.INSTANCE.name(),
60-
transportService,
61-
clusterService,
62-
threadPool,
6367
actionFilters,
64-
GetDataStreamLifecycleAction.Request::new,
65-
projectResolver,
66-
GetDataStreamLifecycleAction.Response::new,
67-
EsExecutors.DIRECT_EXECUTOR_SERVICE
68+
transportService.getTaskManager(),
69+
clusterService,
70+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
71+
projectResolver
6872
);
6973
clusterSettings = clusterService.getClusterSettings();
7074
this.indexNameExpressionResolver = indexNameExpressionResolver;
7175
this.globalRetentionSettings = globalRetentionSettings;
76+
77+
transportService.registerRequestHandler(
78+
actionName,
79+
executor,
80+
false,
81+
true,
82+
GetDataStreamLifecycleAction.Request::new,
83+
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
84+
);
7285
}
7386

7487
@Override
75-
protected void masterOperation(
88+
protected void localClusterStateOperation(
7689
Task task,
7790
GetDataStreamLifecycleAction.Request request,
7891
ProjectState state,
@@ -86,6 +99,7 @@ protected void masterOperation(
8699
);
87100
Map<String, DataStream> dataStreams = state.metadata().dataStreams();
88101

102+
((CancellableTask) task).ensureNotCancelled();
89103
listener.onResponse(
90104
new GetDataStreamLifecycleAction.Response(
91105
results.stream()

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestGetDataStreamLifecycleAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.rest.RestUtils;
1919
import org.elasticsearch.rest.Scope;
2020
import org.elasticsearch.rest.ServerlessScope;
21+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
2122
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
2223

2324
import java.util.List;
@@ -46,7 +47,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
4647
);
4748
getDataLifecycleRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
4849
getDataLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataLifecycleRequest.indicesOptions()));
49-
return channel -> client.execute(
50+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
5051
GetDataStreamLifecycleAction.INSTANCE,
5152
getDataLifecycleRequest,
5253
new RestRefCountedChunkedToXContentListener<>(channel)

server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/GetDataStreamLifecycleAction.java

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import org.elasticsearch.action.IndicesRequest;
1616
import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
1717
import org.elasticsearch.action.support.IndicesOptions;
18-
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
18+
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
1919
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
2020
import org.elasticsearch.common.collect.Iterators;
2121
import org.elasticsearch.common.io.stream.StreamInput;
@@ -24,6 +24,10 @@
2424
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
2525
import org.elasticsearch.core.Nullable;
2626
import org.elasticsearch.core.TimeValue;
27+
import org.elasticsearch.core.UpdateForV10;
28+
import org.elasticsearch.tasks.CancellableTask;
29+
import org.elasticsearch.tasks.Task;
30+
import org.elasticsearch.tasks.TaskId;
2731
import org.elasticsearch.xcontent.ParseField;
2832
import org.elasticsearch.xcontent.ToXContent;
2933
import org.elasticsearch.xcontent.ToXContentObject;
@@ -33,6 +37,7 @@
3337
import java.util.Arrays;
3438
import java.util.Iterator;
3539
import java.util.List;
40+
import java.util.Map;
3641
import java.util.Objects;
3742

3843
/**
@@ -44,7 +49,7 @@ public class GetDataStreamLifecycleAction {
4449

4550
private GetDataStreamLifecycleAction() {/* no instances */}
4651

47-
public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
52+
public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable {
4853

4954
private String[] names;
5055
private IndicesOptions indicesOptions = IndicesOptions.builder()
@@ -89,21 +94,23 @@ public ActionRequestValidationException validate() {
8994
return null;
9095
}
9196

97+
@Override
98+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
99+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
100+
}
101+
102+
/**
103+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
104+
* we no longer need to support calling this action remotely.
105+
*/
106+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
92107
public Request(StreamInput in) throws IOException {
93108
super(in);
94109
this.names = in.readOptionalStringArray();
95110
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
96111
this.includeDefaults = in.readBoolean();
97112
}
98113

99-
@Override
100-
public void writeTo(StreamOutput out) throws IOException {
101-
super.writeTo(out);
102-
out.writeOptionalStringArray(names);
103-
indicesOptions.writeIndicesOptions(out);
104-
out.writeBoolean(includeDefaults);
105-
}
106-
107114
@Override
108115
public boolean equals(Object o) {
109116
if (this == o) return true;
@@ -169,14 +176,11 @@ public record DataStreamLifecycle(
169176
public static final ParseField NAME_FIELD = new ParseField("name");
170177
public static final ParseField LIFECYCLE_FIELD = new ParseField("lifecycle");
171178

172-
DataStreamLifecycle(StreamInput in) throws IOException {
173-
this(
174-
in.readString(),
175-
in.readOptionalWriteable(org.elasticsearch.cluster.metadata.DataStreamLifecycle::new),
176-
in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) && in.readBoolean()
177-
);
178-
}
179-
179+
/**
180+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
181+
* we no longer need to support calling this action remotely.
182+
*/
183+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
180184
@Override
181185
public void writeTo(StreamOutput out) throws IOException {
182186
out.writeString(dataStreamName);
@@ -238,16 +242,6 @@ public Response(
238242
this.globalRetention = globalRetention;
239243
}
240244

241-
public Response(StreamInput in) throws IOException {
242-
this(
243-
in.readCollectionAsList(DataStreamLifecycle::new),
244-
in.readOptionalWriteable(RolloverConfiguration::new),
245-
in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)
246-
? in.readOptionalWriteable(DataStreamGlobalRetention::read)
247-
: null
248-
);
249-
}
250-
251245
public List<DataStreamLifecycle> getDataStreamLifecycles() {
252246
return dataStreamLifecycles;
253247
}
@@ -261,6 +255,11 @@ public DataStreamGlobalRetention getGlobalRetention() {
261255
return globalRetention;
262256
}
263257

258+
/**
259+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
260+
* we no longer need to support calling this action remotely.
261+
*/
262+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
264263
@Override
265264
public void writeTo(StreamOutput out) throws IOException {
266265
out.writeCollection(dataStreamLifecycles);

0 commit comments

Comments
 (0)