Skip to content

Commit 184365e

Browse files
committed
Add 'verbose' flag retrieving maximum_timestamp for get data stream API
This commit adds support for the `verbose` querystring parameter to the get data stream API (`GET /_data_stream/{name}`). The flag defaults to "false". When set to true, the `maximum_timestamp` for the data stream will be retrieved and returned for each data stream retrieved. This is the same information available from the data stream stats API (and internally uses the same action to retrieval).
1 parent 3cbb526 commit 184365e

File tree

11 files changed

+267
-50
lines changed

11 files changed

+267
-50
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@
3535
import org.elasticsearch.datastreams.action.CreateDataStreamTransportAction;
3636
import org.elasticsearch.datastreams.action.DataStreamsStatsTransportAction;
3737
import org.elasticsearch.datastreams.action.DeleteDataStreamTransportAction;
38-
import org.elasticsearch.datastreams.action.GetDataStreamsTransportAction;
3938
import org.elasticsearch.datastreams.action.MigrateToDataStreamTransportAction;
4039
import org.elasticsearch.datastreams.action.ModifyDataStreamsTransportAction;
4140
import org.elasticsearch.datastreams.action.PromoteDataStreamTransportAction;
41+
import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction;
4242
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
4343
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
4444
import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamLifecycleAction;
@@ -218,7 +218,7 @@ public Collection<?> createComponents(PluginServices services) {
218218
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>();
219219
actions.add(new ActionHandler<>(CreateDataStreamAction.INSTANCE, CreateDataStreamTransportAction.class));
220220
actions.add(new ActionHandler<>(DeleteDataStreamAction.INSTANCE, DeleteDataStreamTransportAction.class));
221-
actions.add(new ActionHandler<>(GetDataStreamAction.INSTANCE, GetDataStreamsTransportAction.class));
221+
actions.add(new ActionHandler<>(GetDataStreamAction.INSTANCE, TransportGetDataStreamsAction.class));
222222
actions.add(new ActionHandler<>(DataStreamsStatsAction.INSTANCE, DataStreamsStatsTransportAction.class));
223223
actions.add(new ActionHandler<>(MigrateToDataStreamAction.INSTANCE, MigrateToDataStreamTransportAction.class));
224224
actions.add(new ActionHandler<>(PromoteDataStreamAction.INSTANCE, PromoteDataStreamTransportAction.class));
Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.datastreams.DataStreamsActionUtil;
14+
import org.elasticsearch.action.datastreams.DataStreamsStatsAction;
1415
import org.elasticsearch.action.datastreams.GetDataStreamAction;
1516
import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.IndexProperties;
1617
import org.elasticsearch.action.datastreams.GetDataStreamAction.Response.ManagedBy;
1718
import org.elasticsearch.action.support.ActionFilters;
1819
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
20+
import org.elasticsearch.client.internal.Client;
1921
import org.elasticsearch.cluster.ClusterState;
2022
import org.elasticsearch.cluster.block.ClusterBlockException;
2123
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -30,7 +32,7 @@
3032
import org.elasticsearch.cluster.service.ClusterService;
3133
import org.elasticsearch.common.settings.ClusterSettings;
3234
import org.elasticsearch.common.settings.Settings;
33-
import org.elasticsearch.common.util.concurrent.EsExecutors;
35+
import org.elasticsearch.core.Nullable;
3436
import org.elasticsearch.core.Tuple;
3537
import org.elasticsearch.index.Index;
3638
import org.elasticsearch.index.IndexMode;
@@ -43,31 +45,35 @@
4345

4446
import java.time.Instant;
4547
import java.util.ArrayList;
48+
import java.util.Arrays;
4649
import java.util.Comparator;
4750
import java.util.HashMap;
4851
import java.util.List;
4952
import java.util.Map;
53+
import java.util.stream.Collectors;
5054

5155
import static org.elasticsearch.index.IndexSettings.PREFER_ILM_SETTING;
5256

53-
public class GetDataStreamsTransportAction extends TransportMasterNodeReadAction<
57+
public class TransportGetDataStreamsAction extends TransportMasterNodeReadAction<
5458
GetDataStreamAction.Request,
5559
GetDataStreamAction.Response> {
5660

57-
private static final Logger LOGGER = LogManager.getLogger(GetDataStreamsTransportAction.class);
61+
private static final Logger LOGGER = LogManager.getLogger(TransportGetDataStreamsAction.class);
5862
private final SystemIndices systemIndices;
5963
private final ClusterSettings clusterSettings;
6064
private final DataStreamGlobalRetentionSettings globalRetentionSettings;
65+
private final Client client;
6166

6267
@Inject
63-
public GetDataStreamsTransportAction(
68+
public TransportGetDataStreamsAction(
6469
TransportService transportService,
6570
ClusterService clusterService,
6671
ThreadPool threadPool,
6772
ActionFilters actionFilters,
6873
IndexNameExpressionResolver indexNameExpressionResolver,
6974
SystemIndices systemIndices,
70-
DataStreamGlobalRetentionSettings globalRetentionSettings
75+
DataStreamGlobalRetentionSettings globalRetentionSettings,
76+
Client client
7177
) {
7278
super(
7379
GetDataStreamAction.NAME,
@@ -78,11 +84,12 @@ public GetDataStreamsTransportAction(
7884
GetDataStreamAction.Request::new,
7985
indexNameExpressionResolver,
8086
GetDataStreamAction.Response::new,
81-
EsExecutors.DIRECT_EXECUTOR_SERVICE
87+
transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT)
8288
);
8389
this.systemIndices = systemIndices;
8490
this.globalRetentionSettings = globalRetentionSettings;
8591
clusterSettings = clusterService.getClusterSettings();
92+
this.client = client;
8693
}
8794

8895
@Override
@@ -92,9 +99,42 @@ protected void masterOperation(
9299
ClusterState state,
93100
ActionListener<GetDataStreamAction.Response> listener
94101
) throws Exception {
95-
listener.onResponse(
96-
innerOperation(state, request, indexNameExpressionResolver, systemIndices, clusterSettings, globalRetentionSettings)
97-
);
102+
if (request.verbose()) {
103+
DataStreamsStatsAction.Request req = new DataStreamsStatsAction.Request();
104+
req.indices(request.indices());
105+
client.execute(DataStreamsStatsAction.INSTANCE, req, new ActionListener<>() {
106+
@Override
107+
public void onResponse(DataStreamsStatsAction.Response response) {
108+
final Map<String, Long> maxTimestamps = Arrays.stream(response.getDataStreams())
109+
.collect(
110+
Collectors.toMap(
111+
DataStreamsStatsAction.DataStreamStats::getDataStream,
112+
DataStreamsStatsAction.DataStreamStats::getMaximumTimestamp
113+
)
114+
);
115+
listener.onResponse(
116+
innerOperation(
117+
state,
118+
request,
119+
indexNameExpressionResolver,
120+
systemIndices,
121+
clusterSettings,
122+
globalRetentionSettings,
123+
maxTimestamps
124+
)
125+
);
126+
}
127+
128+
@Override
129+
public void onFailure(Exception e) {
130+
listener.onFailure(e);
131+
}
132+
});
133+
} else {
134+
listener.onResponse(
135+
innerOperation(state, request, indexNameExpressionResolver, systemIndices, clusterSettings, globalRetentionSettings, null)
136+
);
137+
}
98138
}
99139

100140
static GetDataStreamAction.Response innerOperation(
@@ -103,7 +143,8 @@ static GetDataStreamAction.Response innerOperation(
103143
IndexNameExpressionResolver indexNameExpressionResolver,
104144
SystemIndices systemIndices,
105145
ClusterSettings clusterSettings,
106-
DataStreamGlobalRetentionSettings globalRetentionSettings
146+
DataStreamGlobalRetentionSettings globalRetentionSettings,
147+
@Nullable Map<String, Long> maxTimestamps
107148
) {
108149
List<DataStream> dataStreams = getDataStreams(state, indexNameExpressionResolver, request);
109150
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = new ArrayList<>(dataStreams.size());
@@ -216,7 +257,8 @@ public int compareTo(IndexInfo o) {
216257
ilmPolicyName,
217258
timeSeries,
218259
backingIndicesSettingsValues,
219-
indexTemplatePreferIlmValue
260+
indexTemplatePreferIlmValue,
261+
maxTimestamps == null ? null : maxTimestamps.get(dataStream.getName())
220262
)
221263
);
222264
}

modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
4545
);
4646
getDataStreamsRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
4747
getDataStreamsRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataStreamsRequest.indicesOptions()));
48+
getDataStreamsRequest.verbose(request.paramAsBoolean("verbose", false));
4849
return channel -> client.execute(GetDataStreamAction.INSTANCE, getDataStreamsRequest, new RestToXContentListener<>(channel));
4950
}
5051

@@ -57,4 +58,19 @@ public boolean allowSystemIndexAccessByDefault() {
5758
public Set<String> supportedCapabilities() {
5859
return Set.of(DataStreamLifecycle.EFFECTIVE_RETENTION_REST_API_CAPABILITY);
5960
}
61+
62+
@Override
63+
public Set<String> supportedQueryParameters() {
64+
return Set.of(
65+
"name",
66+
"include_defaults",
67+
"timeout",
68+
"master_timeout",
69+
IndicesOptions.WildcardOptions.EXPAND_WILDCARDS,
70+
IndicesOptions.ConcreteTargetOptions.IGNORE_UNAVAILABLE,
71+
IndicesOptions.WildcardOptions.ALLOW_NO_INDICES,
72+
IndicesOptions.GatekeeperOptions.IGNORE_THROTTLED,
73+
"verbose"
74+
);
75+
}
6076
}

modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsRequestTests.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.datastreams.action;
99

1010
import org.elasticsearch.action.datastreams.GetDataStreamAction.Request;
11+
import org.elasticsearch.action.support.IndicesOptions;
1112
import org.elasticsearch.common.io.stream.Writeable;
1213
import org.elasticsearch.test.AbstractWireSerializingTestCase;
1314

@@ -20,7 +21,7 @@ protected Writeable.Reader<Request> instanceReader() {
2021

2122
@Override
2223
protected Request createTestInstance() {
23-
return new Request(TEST_REQUEST_TIMEOUT, switch (randomIntBetween(1, 4)) {
24+
var req = new Request(TEST_REQUEST_TIMEOUT, switch (randomIntBetween(1, 4)) {
2425
case 1 -> generateRandomStringArray(3, 8, false, false);
2526
case 2 -> {
2627
String[] parameters = generateRandomStringArray(3, 8, false, false);
@@ -32,11 +33,40 @@ protected Request createTestInstance() {
3233
case 3 -> new String[] { "*" };
3334
default -> null;
3435
});
36+
req.verbose(randomBoolean());
37+
return req;
3538
}
3639

3740
@Override
3841
protected Request mutateInstance(Request instance) {
39-
return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929
42+
var indices = instance.indices();
43+
var indicesOpts = instance.indicesOptions();
44+
var includeDefaults = instance.includeDefaults();
45+
var verbose = instance.verbose();
46+
switch (randomIntBetween(0, 3)) {
47+
case 0 -> indices = randomValueOtherThan(indices, () -> generateRandomStringArray(3, 8, false, false));
48+
case 1 -> indicesOpts = randomValueOtherThan(
49+
indicesOpts,
50+
() -> IndicesOptions.fromOptions(
51+
randomBoolean(),
52+
randomBoolean(),
53+
randomBoolean(),
54+
randomBoolean(),
55+
randomBoolean(),
56+
randomBoolean(),
57+
randomBoolean(),
58+
randomBoolean(),
59+
randomBoolean()
60+
)
61+
);
62+
case 2 -> includeDefaults = includeDefaults == false;
63+
case 3 -> verbose = verbose == false;
64+
}
65+
var newReq = new Request(instance.masterNodeTimeout(), indices);
66+
newReq.includeDefaults(includeDefaults);
67+
newReq.indicesOptions(indicesOpts);
68+
newReq.verbose(verbose);
69+
return newReq;
4070
}
4171

4272
}

modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.index.Index;
2121
import org.elasticsearch.index.IndexMode;
2222
import org.elasticsearch.test.AbstractWireSerializingTestCase;
23+
import org.elasticsearch.test.ESTestCase;
2324
import org.elasticsearch.xcontent.ToXContent;
2425
import org.elasticsearch.xcontent.XContentBuilder;
2526
import org.elasticsearch.xcontent.XContentFactory;
@@ -104,7 +105,8 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
104105
null,
105106
null,
106107
indexSettingsValues,
107-
false
108+
false,
109+
null
108110
);
109111
Response response = new Response(List.of(dataStreamInfo));
110112
XContentBuilder contentBuilder = XContentFactory.jsonBuilder();
@@ -206,7 +208,8 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
206208
null,
207209
null,
208210
indexSettingsValues,
209-
false
211+
false,
212+
null
210213
);
211214
Response response = new Response(List.of(dataStreamInfo));
212215
XContentBuilder contentBuilder = XContentFactory.jsonBuilder();
@@ -283,7 +286,8 @@ private Response.DataStreamInfo mutateInstance(Response.DataStreamInfo instance)
283286
var timeSeries = instance.getTimeSeries();
284287
var indexSettings = instance.getIndexSettingsValues();
285288
var templatePreferIlm = instance.templatePreferIlmValue();
286-
switch (randomIntBetween(0, 6)) {
289+
var maximumTimestamp = instance.getMaximumTimestamp();
290+
switch (randomIntBetween(0, 7)) {
287291
case 0 -> dataStream = randomValueOtherThan(dataStream, DataStreamTestHelper::randomInstance);
288292
case 1 -> status = randomValueOtherThan(status, () -> randomFrom(ClusterHealthStatus.values()));
289293
case 2 -> indexTemplate = randomBoolean() && indexTemplate != null ? null : randomAlphaOfLengthBetween(2, 10);
@@ -305,8 +309,20 @@ private Response.DataStreamInfo mutateInstance(Response.DataStreamInfo instance)
305309
)
306310
);
307311
case 6 -> templatePreferIlm = templatePreferIlm ? false : true;
312+
case 7 -> maximumTimestamp = (maximumTimestamp == null)
313+
? randomNonNegativeLong()
314+
: (usually() ? randomValueOtherThan(maximumTimestamp, ESTestCase::randomNonNegativeLong) : null);
308315
}
309-
return new Response.DataStreamInfo(dataStream, status, indexTemplate, ilmPolicyName, timeSeries, indexSettings, templatePreferIlm);
316+
return new Response.DataStreamInfo(
317+
dataStream,
318+
status,
319+
indexTemplate,
320+
ilmPolicyName,
321+
timeSeries,
322+
indexSettings,
323+
templatePreferIlm,
324+
maximumTimestamp
325+
);
310326
}
311327

312328
private List<Tuple<Instant, Instant>> generateRandomTimeSeries() {
@@ -342,7 +358,8 @@ private Response.DataStreamInfo generateRandomDataStreamInfo() {
342358
randomAlphaOfLengthBetween(2, 10),
343359
timeSeries != null ? new Response.TimeSeries(timeSeries) : null,
344360
generateRandomIndexSettingsValues(),
345-
randomBoolean()
361+
randomBoolean(),
362+
usually() ? randomNonNegativeLong() : null
346363
);
347364
}
348365
}

0 commit comments

Comments
 (0)