Skip to content

Commit c650b43

Browse files
committed
Merge branch 'main' into local-get-data-stream-options
2 parents f75dc35 + bbc47d9 commit c650b43

File tree

11 files changed

+109
-65
lines changed

11 files changed

+109
-65
lines changed

docs/changelog/125214.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 125214
2+
summary: Run `TransportGetDataStreamLifecycleAction` on local node
3+
area: Data streams
4+
type: enhancement
5+
issues: []

docs/changelog/125352.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 125352
2+
summary: Fix NPE in rolling over unknown target and return 404
3+
area: Indices APIs
4+
type: bug
5+
issues: []

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamRestActionCancellationIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.http.client.methods.HttpGet;
1313
import org.apache.http.client.methods.HttpPost;
1414
import org.elasticsearch.action.datastreams.GetDataStreamAction;
15+
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
1516
import org.elasticsearch.action.support.CancellableActionTestPlugin;
1617
import org.elasticsearch.action.support.PlainActionFuture;
1718
import org.elasticsearch.action.support.RefCountingListener;
@@ -65,6 +66,13 @@ public void testGetDataStreamCancellation() {
6566
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_data_stream?verbose"), GetDataStreamAction.NAME);
6667
}
6768

69+
public void testGetDataStreamLifecycleCancellation() {
70+
runRestActionCancellationTest(
71+
new Request(HttpGet.METHOD_NAME, "/_data_stream/test/_lifecycle"),
72+
GetDataStreamLifecycleAction.INSTANCE.name()
73+
);
74+
}
75+
6876
public void testGetDataStreamOptionsCancellation() {
6977
runRestActionCancellationTest(
7078
new Request(HttpGet.METHOD_NAME, "/_data_stream/test/_options"),

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleAction.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
1313
import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction;
1414
import org.elasticsearch.action.support.ActionFilters;
15-
import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction;
15+
import org.elasticsearch.action.support.ChannelActionListener;
16+
import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction;
1617
import org.elasticsearch.cluster.ProjectState;
1718
import org.elasticsearch.cluster.block.ClusterBlockException;
1819
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -24,9 +25,10 @@
2425
import org.elasticsearch.cluster.service.ClusterService;
2526
import org.elasticsearch.common.settings.ClusterSettings;
2627
import org.elasticsearch.common.util.concurrent.EsExecutors;
28+
import org.elasticsearch.core.UpdateForV10;
2729
import org.elasticsearch.injection.guice.Inject;
30+
import org.elasticsearch.tasks.CancellableTask;
2831
import org.elasticsearch.tasks.Task;
29-
import org.elasticsearch.threadpool.ThreadPool;
3032
import org.elasticsearch.transport.TransportService;
3133

3234
import java.util.Comparator;
@@ -38,41 +40,52 @@
3840
* Collects the data streams from the cluster state, filters the ones that do not have a data stream lifecycle configured and then returns
3941
* a list of the data stream name and respective lifecycle configuration.
4042
*/
41-
public class TransportGetDataStreamLifecycleAction extends TransportMasterNodeReadProjectAction<
43+
public class TransportGetDataStreamLifecycleAction extends TransportLocalProjectMetadataAction<
4244
GetDataStreamLifecycleAction.Request,
4345
GetDataStreamLifecycleAction.Response> {
4446
private final ClusterSettings clusterSettings;
4547
private final IndexNameExpressionResolver indexNameExpressionResolver;
4648
private final DataStreamGlobalRetentionSettings globalRetentionSettings;
4749

50+
/**
51+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
52+
* we no longer need to support calling this action remotely.
53+
*/
54+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
55+
@SuppressWarnings("this-escape")
4856
@Inject
4957
public TransportGetDataStreamLifecycleAction(
5058
TransportService transportService,
5159
ClusterService clusterService,
52-
ThreadPool threadPool,
5360
ActionFilters actionFilters,
5461
ProjectResolver projectResolver,
5562
IndexNameExpressionResolver indexNameExpressionResolver,
5663
DataStreamGlobalRetentionSettings globalRetentionSettings
5764
) {
5865
super(
5966
GetDataStreamLifecycleAction.INSTANCE.name(),
60-
transportService,
61-
clusterService,
62-
threadPool,
6367
actionFilters,
64-
GetDataStreamLifecycleAction.Request::new,
65-
projectResolver,
66-
GetDataStreamLifecycleAction.Response::new,
67-
EsExecutors.DIRECT_EXECUTOR_SERVICE
68+
transportService.getTaskManager(),
69+
clusterService,
70+
EsExecutors.DIRECT_EXECUTOR_SERVICE,
71+
projectResolver
6872
);
6973
clusterSettings = clusterService.getClusterSettings();
7074
this.indexNameExpressionResolver = indexNameExpressionResolver;
7175
this.globalRetentionSettings = globalRetentionSettings;
76+
77+
transportService.registerRequestHandler(
78+
actionName,
79+
executor,
80+
false,
81+
true,
82+
GetDataStreamLifecycleAction.Request::new,
83+
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
84+
);
7285
}
7386

7487
@Override
75-
protected void masterOperation(
88+
protected void localClusterStateOperation(
7689
Task task,
7790
GetDataStreamLifecycleAction.Request request,
7891
ProjectState state,
@@ -86,6 +99,7 @@ protected void masterOperation(
8699
);
87100
Map<String, DataStream> dataStreams = state.metadata().dataStreams();
88101

102+
((CancellableTask) task).ensureNotCancelled();
89103
listener.onResponse(
90104
new GetDataStreamLifecycleAction.Response(
91105
results.stream()

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/rest/RestGetDataStreamLifecycleAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.rest.RestUtils;
1919
import org.elasticsearch.rest.Scope;
2020
import org.elasticsearch.rest.ServerlessScope;
21+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
2122
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
2223

2324
import java.util.List;
@@ -46,7 +47,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
4647
);
4748
getDataLifecycleRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
4849
getDataLifecycleRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataLifecycleRequest.indicesOptions()));
49-
return channel -> client.execute(
50+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
5051
GetDataStreamLifecycleAction.INSTANCE,
5152
getDataLifecycleRequest,
5253
new RestRefCountedChunkedToXContentListener<>(channel)

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.rollover/10_basic.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,3 +183,19 @@
183183
min_age: "0s"
184184
min_docs: 1
185185
- match: { error.reason: "Validation Failed: 1: at least one max_* rollover condition must be set when using min_* conditions;" }
186+
187+
---
188+
"Rolling over an unknown target should return 404":
189+
- requires:
190+
capabilities:
191+
- method: POST
192+
path: /{index}/_rollover
193+
capabilities: ['return-404-on-missing-target']
194+
test_runner_features: [capabilities]
195+
reason: Rollover used to return a 400, then it briefly returned a 500 due to an NPE, now it properly returns a 404
196+
197+
- do:
198+
catch: missing
199+
indices.rollover:
200+
alias: "non_existent"
201+
- match: {error.reason: "rollover target [non_existent] does not exist"}

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.ResourceNotFoundException;
1415
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
1516
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
1617
import org.elasticsearch.action.datastreams.autosharding.AutoShardingResult;
@@ -151,7 +152,7 @@ public RolloverResult rolloverClusterState(
151152
@Nullable AutoShardingResult autoShardingResult,
152153
boolean isFailureStoreRollover
153154
) throws Exception {
154-
validate(currentState.metadata(), rolloverTarget, newIndexName, createIndexRequest, isFailureStoreRollover);
155+
validate(currentState.metadata(), rolloverTarget, newIndexName, createIndexRequest);
155156
final IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(rolloverTarget);
156157
return switch (indexAbstraction.getType()) {
157158
case ALIAS -> rolloverAlias(
@@ -193,7 +194,7 @@ public static NameResolution resolveRolloverNames(
193194
CreateIndexRequest createIndexRequest,
194195
boolean isFailureStoreRollover
195196
) {
196-
validate(project, rolloverTarget, newIndexName, createIndexRequest, isFailureStoreRollover);
197+
validate(project, rolloverTarget, newIndexName, createIndexRequest);
197198
final IndexAbstraction indexAbstraction = project.getIndicesLookup().get(rolloverTarget);
198199
return switch (indexAbstraction.getType()) {
199200
case ALIAS -> resolveAliasRolloverNames(project, indexAbstraction, newIndexName);
@@ -655,16 +656,10 @@ static void checkNoDuplicatedAliasInIndexTemplate(
655656
}
656657
}
657658

658-
static void validate(
659-
ProjectMetadata project,
660-
String rolloverTarget,
661-
String newIndexName,
662-
CreateIndexRequest request,
663-
boolean isFailureStoreRollover
664-
) {
659+
static void validate(ProjectMetadata project, String rolloverTarget, String newIndexName, CreateIndexRequest request) {
665660
final IndexAbstraction indexAbstraction = project.getIndicesLookup().get(rolloverTarget);
666661
if (indexAbstraction == null) {
667-
throw new IllegalArgumentException("rollover target [" + rolloverTarget + "] does not exist");
662+
throw new ResourceNotFoundException("rollover target [" + rolloverTarget + "] does not exist");
668663
}
669664
if (VALID_ROLLOVER_TARGETS.contains(indexAbstraction.getType()) == false) {
670665
throw new IllegalArgumentException(

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ protected ClusterBlockException checkBlock(RolloverRequest request, ClusterState
164164
ResolvedExpression resolvedRolloverTarget = SelectorResolver.parseExpression(request.getRolloverTarget(), request.indicesOptions());
165165
final IndexAbstraction indexAbstraction = projectMetadata.getIndicesLookup().get(resolvedRolloverTarget.resource());
166166
final String[] indicesToCheck;
167-
if (indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) {
167+
if (indexAbstraction != null && indexAbstraction.getType().equals(IndexAbstraction.Type.DATA_STREAM)) {
168168
DataStream dataStream = (DataStream) indexAbstraction;
169169
boolean targetFailureStore = resolvedRolloverTarget.selector() != null
170170
&& resolvedRolloverTarget.selector().shouldIncludeFailures();

server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/GetDataStreamLifecycleAction.java

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import org.elasticsearch.action.IndicesRequest;
1616
import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
1717
import org.elasticsearch.action.support.IndicesOptions;
18-
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
18+
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
1919
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
2020
import org.elasticsearch.common.collect.Iterators;
2121
import org.elasticsearch.common.io.stream.StreamInput;
@@ -24,6 +24,10 @@
2424
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
2525
import org.elasticsearch.core.Nullable;
2626
import org.elasticsearch.core.TimeValue;
27+
import org.elasticsearch.core.UpdateForV10;
28+
import org.elasticsearch.tasks.CancellableTask;
29+
import org.elasticsearch.tasks.Task;
30+
import org.elasticsearch.tasks.TaskId;
2731
import org.elasticsearch.xcontent.ParseField;
2832
import org.elasticsearch.xcontent.ToXContent;
2933
import org.elasticsearch.xcontent.ToXContentObject;
@@ -33,6 +37,7 @@
3337
import java.util.Arrays;
3438
import java.util.Iterator;
3539
import java.util.List;
40+
import java.util.Map;
3641
import java.util.Objects;
3742

3843
/**
@@ -44,7 +49,7 @@ public class GetDataStreamLifecycleAction {
4449

4550
private GetDataStreamLifecycleAction() {/* no instances */}
4651

47-
public static class Request extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
52+
public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable {
4853

4954
private String[] names;
5055
private IndicesOptions indicesOptions = IndicesOptions.builder()
@@ -89,21 +94,23 @@ public ActionRequestValidationException validate() {
8994
return null;
9095
}
9196

97+
@Override
98+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
99+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
100+
}
101+
102+
/**
103+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
104+
* we no longer need to support calling this action remotely.
105+
*/
106+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
92107
public Request(StreamInput in) throws IOException {
93108
super(in);
94109
this.names = in.readOptionalStringArray();
95110
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
96111
this.includeDefaults = in.readBoolean();
97112
}
98113

99-
@Override
100-
public void writeTo(StreamOutput out) throws IOException {
101-
super.writeTo(out);
102-
out.writeOptionalStringArray(names);
103-
indicesOptions.writeIndicesOptions(out);
104-
out.writeBoolean(includeDefaults);
105-
}
106-
107114
@Override
108115
public boolean equals(Object o) {
109116
if (this == o) return true;
@@ -169,14 +176,11 @@ public record DataStreamLifecycle(
169176
public static final ParseField NAME_FIELD = new ParseField("name");
170177
public static final ParseField LIFECYCLE_FIELD = new ParseField("lifecycle");
171178

172-
DataStreamLifecycle(StreamInput in) throws IOException {
173-
this(
174-
in.readString(),
175-
in.readOptionalWriteable(org.elasticsearch.cluster.metadata.DataStreamLifecycle::new),
176-
in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) && in.readBoolean()
177-
);
178-
}
179-
179+
/**
180+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
181+
* we no longer need to support calling this action remotely.
182+
*/
183+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
180184
@Override
181185
public void writeTo(StreamOutput out) throws IOException {
182186
out.writeString(dataStreamName);
@@ -238,16 +242,6 @@ public Response(
238242
this.globalRetention = globalRetention;
239243
}
240244

241-
public Response(StreamInput in) throws IOException {
242-
this(
243-
in.readCollectionAsList(DataStreamLifecycle::new),
244-
in.readOptionalWriteable(RolloverConfiguration::new),
245-
in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)
246-
? in.readOptionalWriteable(DataStreamGlobalRetention::read)
247-
: null
248-
);
249-
}
250-
251245
public List<DataStreamLifecycle> getDataStreamLifecycles() {
252246
return dataStreamLifecycles;
253247
}
@@ -261,6 +255,11 @@ public DataStreamGlobalRetention getGlobalRetention() {
261255
return globalRetention;
262256
}
263257

258+
/**
259+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
260+
* we no longer need to support calling this action remotely.
261+
*/
262+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
264263
@Override
265264
public void writeTo(StreamOutput out) throws IOException {
266265
out.writeCollection(dataStreamLifecycles);

server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ public String getName() {
4444
@Override
4545
public Set<String> supportedCapabilities() {
4646
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
47-
return Set.of("lazy-rollover-failure-store", "index-expression-selectors");
47+
return Set.of("return-404-on-missing-target", "lazy-rollover-failure-store", "index-expression-selectors");
4848
} else {
49-
return Set.of();
49+
return Set.of("return-404-on-missing-target");
5050
}
5151
}
5252

0 commit comments

Comments
 (0)