Skip to content

Commit 0c5f281

Browse files
committed
Run TransportExplainDataStreamLifecycleAction on local node
This action solely needs the cluster state, it can run on any node. Additionally, it needs to be cancellable to avoid doing unnecessary work after a client failure or timeout. Relates #101805
1 parent 4e0e36b commit 0c5f281

File tree

4 files changed

+52
-91
lines changed

4 files changed

+52
-91
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction;
1515
import org.elasticsearch.action.datastreams.lifecycle.ExplainIndexDataStreamLifecycle;
1616
import org.elasticsearch.action.support.ActionFilters;
17-
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
17+
import org.elasticsearch.action.support.ChannelActionListener;
18+
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
1819
import org.elasticsearch.cluster.ClusterState;
1920
import org.elasticsearch.cluster.block.ClusterBlockException;
2021
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -28,8 +29,10 @@
2829
import org.elasticsearch.cluster.service.ClusterService;
2930
import org.elasticsearch.common.settings.ClusterSettings;
3031
import org.elasticsearch.core.TimeValue;
32+
import org.elasticsearch.core.UpdateForV10;
3133
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
3234
import org.elasticsearch.injection.guice.Inject;
35+
import org.elasticsearch.tasks.CancellableTask;
3336
import org.elasticsearch.tasks.Task;
3437
import org.elasticsearch.threadpool.ThreadPool;
3538
import org.elasticsearch.transport.TransportService;
@@ -40,13 +43,20 @@
4043
/**
4144
* Transport action handling the explain the data stream lifecycle requests for one or more data stream lifecycle managed indices.
4245
*/
43-
public class TransportExplainDataStreamLifecycleAction extends TransportMasterNodeReadAction<
46+
public class TransportExplainDataStreamLifecycleAction extends TransportLocalClusterStateAction<
4447
ExplainDataStreamLifecycleAction.Request,
4548
ExplainDataStreamLifecycleAction.Response> {
4649

50+
private final IndexNameExpressionResolver indexNameExpressionResolver;
4751
private final DataStreamLifecycleErrorStore errorStore;
4852
private final DataStreamGlobalRetentionSettings globalRetentionSettings;
4953

54+
/**
55+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
56+
* we no longer need to support calling this action remotely.
57+
*/
58+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
59+
@SuppressWarnings("this-escape")
5060
@Inject
5161
public TransportExplainDataStreamLifecycleAction(
5262
TransportService transportService,
@@ -59,27 +69,32 @@ public TransportExplainDataStreamLifecycleAction(
5969
) {
6070
super(
6171
ExplainDataStreamLifecycleAction.INSTANCE.name(),
62-
transportService,
63-
clusterService,
64-
threadPool,
6572
actionFilters,
66-
ExplainDataStreamLifecycleAction.Request::new,
67-
indexNameExpressionResolver,
68-
ExplainDataStreamLifecycleAction.Response::new,
73+
transportService.getTaskManager(),
74+
clusterService,
6975
threadPool.executor(ThreadPool.Names.MANAGEMENT)
7076
);
77+
this.indexNameExpressionResolver = indexNameExpressionResolver;
7178
this.errorStore = dataLifecycleServiceErrorStore;
7279
this.globalRetentionSettings = globalRetentionSettings;
80+
81+
transportService.registerRequestHandler(
82+
actionName,
83+
executor,
84+
false,
85+
true,
86+
ExplainDataStreamLifecycleAction.Request::new,
87+
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
88+
);
7389
}
7490

7591
@Override
76-
protected void masterOperation(
92+
protected void localClusterStateOperation(
7793
Task task,
7894
ExplainDataStreamLifecycleAction.Request request,
7995
ClusterState state,
8096
ActionListener<ExplainDataStreamLifecycleAction.Response> listener
8197
) throws Exception {
82-
8398
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request);
8499
List<ExplainIndexDataStreamLifecycle> explainIndices = new ArrayList<>(concreteIndices.length);
85100
Metadata metadata = state.metadata();
@@ -114,6 +129,7 @@ protected void masterOperation(
114129
explainIndices.add(explainIndexDataStreamLifecycle);
115130
}
116131

132+
((CancellableTask) task).ensureNotCancelled();
117133
ClusterSettings clusterSettings = clusterService.getClusterSettings();
118134
listener.onResponse(
119135
new ExplainDataStreamLifecycleAction.Response(

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestExplainDataStreamLifecycleAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.rest.RestRequest;
1919
import org.elasticsearch.rest.Scope;
2020
import org.elasticsearch.rest.ServerlessScope;
21+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
2122
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
2223

2324
import java.util.List;
@@ -48,7 +49,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
4849
);
4950
explainRequest.includeDefaults(restRequest.paramAsBoolean("include_defaults", false));
5051
explainRequest.indicesOptions(IndicesOptions.fromRequest(restRequest, IndicesOptions.strictExpandOpen()));
51-
return channel -> client.execute(
52+
return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
5253
ExplainDataStreamLifecycleAction.INSTANCE,
5354
explainRequest,
5455
new RestRefCountedChunkedToXContentListener<>(channel)

server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleAction.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import org.elasticsearch.action.IndicesRequest;
1717
import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
1818
import org.elasticsearch.action.support.IndicesOptions;
19-
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
19+
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
2020
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
2121
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
2222
import org.elasticsearch.common.collect.Iterators;
@@ -25,13 +25,18 @@
2525
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
2626
import org.elasticsearch.core.Nullable;
2727
import org.elasticsearch.core.TimeValue;
28+
import org.elasticsearch.core.UpdateForV10;
29+
import org.elasticsearch.tasks.CancellableTask;
30+
import org.elasticsearch.tasks.Task;
31+
import org.elasticsearch.tasks.TaskId;
2832
import org.elasticsearch.xcontent.ParseField;
2933
import org.elasticsearch.xcontent.ToXContent;
3034

3135
import java.io.IOException;
3236
import java.util.Arrays;
3337
import java.util.Iterator;
3438
import java.util.List;
39+
import java.util.Map;
3540
import java.util.Objects;
3641

3742
/**
@@ -46,7 +51,7 @@ private ExplainDataStreamLifecycleAction() {/* no instances */}
4651
/**
4752
* Request explaining the data stream lifecycle for one or more indices.
4853
*/
49-
public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
54+
public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable {
5055
private String[] names;
5156
private boolean includeDefaults;
5257
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
@@ -70,26 +75,28 @@ public ActionRequestValidationException validate() {
7075
return null;
7176
}
7277

78+
@Override
79+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
80+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
81+
}
82+
7383
@Override
7484
public boolean includeDataStreams() {
7585
return true;
7686
}
7787

88+
/**
89+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
90+
* we no longer need to support calling this action remotely.
91+
*/
92+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
7893
public Request(StreamInput in) throws IOException {
7994
super(in);
8095
this.names = in.readOptionalStringArray();
8196
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
8297
this.includeDefaults = in.readBoolean();
8398
}
8499

85-
@Override
86-
public void writeTo(StreamOutput out) throws IOException {
87-
super.writeTo(out);
88-
out.writeOptionalStringArray(names);
89-
indicesOptions.writeIndicesOptions(out);
90-
out.writeBoolean(includeDefaults);
91-
}
92-
93100
@Override
94101
public boolean equals(Object o) {
95102
if (this == o) {
@@ -159,15 +166,6 @@ public Response(
159166
this.globalRetention = globalRetention;
160167
}
161168

162-
public Response(StreamInput in) throws IOException {
163-
super(in);
164-
this.indices = in.readCollectionAsList(ExplainIndexDataStreamLifecycle::new);
165-
this.rolloverConfiguration = in.readOptionalWriteable(RolloverConfiguration::new);
166-
this.globalRetention = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)
167-
? in.readOptionalWriteable(DataStreamGlobalRetention::read)
168-
: null;
169-
}
170-
171169
public List<ExplainIndexDataStreamLifecycle> getIndices() {
172170
return indices;
173171
}
@@ -180,6 +178,11 @@ public DataStreamGlobalRetention getGlobalRetention() {
180178
return globalRetention;
181179
}
182180

181+
/**
182+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
183+
* we no longer need to support calling this action remotely.
184+
*/
185+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
183186
@Override
184187
public void writeTo(StreamOutput out) throws IOException {
185188
out.writeCollection(indices);

server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleResponseTests.java

Lines changed: 2 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,16 @@
1313
import org.elasticsearch.action.admin.indices.rollover.MinPrimaryShardDocsCondition;
1414
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
1515
import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
16-
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
1716
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
1817
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
1918
import org.elasticsearch.common.bytes.BytesReference;
20-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
21-
import org.elasticsearch.common.io.stream.Writeable;
2219
import org.elasticsearch.common.xcontent.XContentHelper;
2320
import org.elasticsearch.core.Nullable;
2421
import org.elasticsearch.core.TimeValue;
25-
import org.elasticsearch.indices.IndicesModule;
2622
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
27-
import org.elasticsearch.test.AbstractWireSerializingTestCase;
28-
import org.elasticsearch.xcontent.NamedXContentRegistry;
23+
import org.elasticsearch.test.ESTestCase;
2924
import org.elasticsearch.xcontent.XContentBuilder;
3025
import org.elasticsearch.xcontent.XContentFactory;
31-
import org.junit.Before;
3226

3327
import java.io.IOException;
3428
import java.util.List;
@@ -39,26 +33,7 @@
3933
import static org.hamcrest.Matchers.is;
4034
import static org.hamcrest.Matchers.nullValue;
4135

42-
public class ExplainDataStreamLifecycleResponseTests extends AbstractWireSerializingTestCase<Response> {
43-
44-
private NamedWriteableRegistry namedWriteableRegistry;
45-
private NamedXContentRegistry xContentRegistry;
46-
47-
@Before
48-
public void setupNamedWriteableRegistry() {
49-
namedWriteableRegistry = new NamedWriteableRegistry(IndicesModule.getNamedWriteables());
50-
xContentRegistry = new NamedXContentRegistry(IndicesModule.getNamedXContents());
51-
}
52-
53-
@Override
54-
protected NamedXContentRegistry xContentRegistry() {
55-
return xContentRegistry;
56-
}
57-
58-
@Override
59-
protected NamedWriteableRegistry getNamedWriteableRegistry() {
60-
return namedWriteableRegistry;
61-
}
36+
public class ExplainDataStreamLifecycleResponseTests extends ESTestCase {
6237

6338
@SuppressWarnings("unchecked")
6439
public void testToXContent() throws IOException {
@@ -280,38 +255,4 @@ private static ExplainIndexDataStreamLifecycle createRandomIndexDataStreamLifecy
280255
: null
281256
);
282257
}
283-
284-
@Override
285-
protected Writeable.Reader<Response> instanceReader() {
286-
return Response::new;
287-
}
288-
289-
@Override
290-
protected Response createTestInstance() {
291-
return randomResponse();
292-
}
293-
294-
@Override
295-
protected Response mutateInstance(Response instance) {
296-
return randomResponse();
297-
}
298-
299-
private Response randomResponse() {
300-
return new Response(
301-
List.of(createRandomIndexDataStreamLifecycleExplanation(System.nanoTime(), randomBoolean() ? new DataStreamLifecycle() : null)),
302-
randomBoolean()
303-
? new RolloverConfiguration(
304-
new RolloverConditions(
305-
Map.of(MaxPrimaryShardDocsCondition.NAME, new MaxPrimaryShardDocsCondition(randomLongBetween(1000, 199_999_000)))
306-
)
307-
)
308-
: null,
309-
randomBoolean()
310-
? new DataStreamGlobalRetention(
311-
TimeValue.timeValueDays(randomIntBetween(1, 10)),
312-
TimeValue.timeValueDays(randomIntBetween(10, 20))
313-
)
314-
: null
315-
);
316-
}
317258
}

0 commit comments

Comments
 (0)