Skip to content

Commit f4910f1

Browse files
committed
Extend the telemetry to include the failures lifecycle data.
1 parent 4e02456 commit f4910f1

File tree

5 files changed

+569
-44
lines changed

5 files changed

+569
-44
lines changed
Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.action;
9+
10+
import org.elasticsearch.action.ActionRequest;
11+
import org.elasticsearch.action.ActionResponse;
12+
import org.elasticsearch.action.support.PlainActionFuture;
13+
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
15+
import org.elasticsearch.cluster.metadata.DataStream;
16+
import org.elasticsearch.cluster.metadata.DataStreamFailureStore;
17+
import org.elasticsearch.cluster.metadata.DataStreamFailureStoreSettings;
18+
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
19+
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
20+
import org.elasticsearch.cluster.metadata.DataStreamOptions;
21+
import org.elasticsearch.cluster.metadata.Metadata;
22+
import org.elasticsearch.cluster.service.ClusterService;
23+
import org.elasticsearch.common.bytes.BytesReference;
24+
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.xcontent.XContentHelper;
26+
import org.elasticsearch.core.TimeValue;
27+
import org.elasticsearch.core.Tuple;
28+
import org.elasticsearch.index.Index;
29+
import org.elasticsearch.index.IndexMode;
30+
import org.elasticsearch.plugins.Plugin;
31+
import org.elasticsearch.protocol.xpack.XPackUsageRequest;
32+
import org.elasticsearch.test.ESIntegTestCase;
33+
import org.elasticsearch.xcontent.ToXContent;
34+
import org.elasticsearch.xcontent.XContentBuilder;
35+
import org.elasticsearch.xcontent.XContentFactory;
36+
import org.elasticsearch.xcontent.XContentType;
37+
import org.elasticsearch.xpack.core.XPackClientPlugin;
38+
import org.junit.After;
39+
40+
import java.io.IOException;
41+
import java.util.ArrayList;
42+
import java.util.Collection;
43+
import java.util.HashMap;
44+
import java.util.List;
45+
import java.util.Map;
46+
import java.util.concurrent.atomic.AtomicInteger;
47+
import java.util.concurrent.atomic.AtomicLong;
48+
import java.util.function.Function;
49+
import java.util.stream.IntStream;
50+
51+
import static org.elasticsearch.xpack.core.action.XPackUsageFeatureAction.DATA_STREAMS;
52+
import static org.hamcrest.Matchers.equalTo;
53+
54+
public class DataStreamUsageTransportActionIT extends ESIntegTestCase {
55+
/*
56+
* The DataStreamUsageTransportAction is not exposed in the xpack core plugin, so we have a special test plugin to do this
57+
*/
58+
@Override
59+
protected Collection<Class<? extends Plugin>> nodePlugins() {
60+
return List.of(TestDataStreamUsagePlugin.class);
61+
}
62+
63+
@After
64+
private void cleanup() throws Exception {
65+
updateClusterState(clusterState -> {
66+
ClusterState.Builder clusterStateBuilder = new ClusterState.Builder(clusterState);
67+
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
68+
metadataBuilder.dataStreams(Map.of(), Map.of());
69+
clusterStateBuilder.metadata(metadataBuilder);
70+
return clusterStateBuilder.build();
71+
});
72+
updateClusterSettings(
73+
Settings.builder()
74+
.putNull(DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING.getKey())
75+
.putNull(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey())
76+
);
77+
}
78+
79+
@SuppressWarnings("unchecked")
80+
public void testAction() throws Exception {
81+
// test empty results
82+
{
83+
Map<String, Object> map = getDataStreamUsage();
84+
assertThat(map.get("available"), equalTo(true));
85+
assertThat(map.get("enabled"), equalTo(true));
86+
assertThat(map.get("data_streams"), equalTo(0));
87+
assertThat(map.get("indices_count"), equalTo(0));
88+
89+
Map<String, Object> failureStoreMap = (Map<String, Object>) map.get("failure_store");
90+
assertThat(failureStoreMap.get("explicitly_enabled_count"), equalTo(0));
91+
assertThat(failureStoreMap.get("effectively_enabled_count"), equalTo(0));
92+
assertThat(failureStoreMap.get("failure_indices_count"), equalTo(0));
93+
94+
Map<String, Object> failuresLifecycleMap = (Map<String, Object>) failureStoreMap.get("lifecycle");
95+
assertThat(failuresLifecycleMap.get("explicitly_enabled_count"), equalTo(0));
96+
assertThat(failuresLifecycleMap.get("effectively_enabled_count"), equalTo(0));
97+
Map<String, Object> dataRetentionMap = (Map<String, Object>) failuresLifecycleMap.get("data_retention");
98+
assertThat(dataRetentionMap.size(), equalTo(1));
99+
assertThat(dataRetentionMap.get("configured_data_streams"), equalTo(0));
100+
101+
Map<String, Object> effectiveRetentionMap = (Map<String, Object>) failuresLifecycleMap.get("effective_retention");
102+
assertThat(effectiveRetentionMap.size(), equalTo(1));
103+
assertThat(effectiveRetentionMap.get("retained_data_streams"), equalTo(0));
104+
105+
Map<String, Object> globalRetentionMap = (Map<String, Object>) failuresLifecycleMap.get("global_retention");
106+
assertThat(globalRetentionMap.get("max"), equalTo(Map.of("defined", false)));
107+
assertThat(globalRetentionMap.get("default"), equalTo(Map.of("defined", false)));
108+
}
109+
110+
// Keep track of the data streams created
111+
int dataStreamsCount = randomIntBetween(1, 200);
112+
AtomicInteger backingIndicesCount = new AtomicInteger(0);
113+
AtomicInteger failureIndicesCount = new AtomicInteger(0);
114+
AtomicInteger explicitlyEnabledFailureStoreCount = new AtomicInteger(0);
115+
AtomicInteger effectivelyEnabledFailureStoreCount = new AtomicInteger(0);
116+
AtomicInteger explicitlyEnabledFailuresLifecycleCount = new AtomicInteger(0);
117+
AtomicInteger effectivelyEnabledFailuresLifecycleCount = new AtomicInteger(0);
118+
AtomicInteger failuresLifecycleWithRetention = new AtomicInteger(0);
119+
AtomicInteger failuresLifecycleWithDefaultRetention = new AtomicInteger(0);
120+
121+
AtomicLong totalRetentionTimes = new AtomicLong(0);
122+
AtomicLong minRetention = new AtomicLong(Long.MAX_VALUE);
123+
AtomicLong maxRetention = new AtomicLong(Long.MIN_VALUE);
124+
125+
TimeValue defaultRetention = TimeValue.timeValueDays(10);
126+
Settings.Builder settingsBuilder = Settings.builder()
127+
.put(DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING.getKey(), "mis-*");
128+
boolean useDefaultRetention = randomBoolean();
129+
if (useDefaultRetention) {
130+
settingsBuilder.put(
131+
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING.getKey(),
132+
defaultRetention.getStringRep()
133+
);
134+
}
135+
updateClusterSettings(settingsBuilder);
136+
137+
/*
138+
* We now add a number of simulated data streams to the cluster state. Some have failure store, some don't. The ones with failure
139+
* store may or may not have lifecycle with varying retention periods. After adding them, we make sure the numbers add up.
140+
*/
141+
updateClusterState(clusterState -> {
142+
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata());
143+
Map<String, DataStream> dataStreamMap = new HashMap<>();
144+
for (int i = 0; i < dataStreamsCount; i++) {
145+
boolean replicated = randomBoolean();
146+
boolean systemDataStream = rarely();
147+
List<Index> backingIndices = IntStream.range(0, randomIntBetween(1, 10))
148+
.mapToObj(ignore -> new Index(randomAlphaOfLength(60), randomAlphaOfLength(60)))
149+
.toList();
150+
backingIndicesCount.addAndGet(backingIndices.size());
151+
List<Index> failureIndices = IntStream.range(0, randomIntBetween(0, 10))
152+
.mapToObj(ignore -> new Index(randomAlphaOfLength(60), randomAlphaOfLength(60)))
153+
.toList();
154+
failureIndicesCount.addAndGet(failureIndices.size());
155+
Boolean failureStoreEnabled = randomBoolean() ? null : randomBoolean();
156+
boolean enabledBySetting = failureStoreEnabled == null && randomBoolean() && systemDataStream == false;
157+
if (failureStoreEnabled == null) {
158+
if (enabledBySetting) {
159+
effectivelyEnabledFailureStoreCount.incrementAndGet();
160+
}
161+
} else if (failureStoreEnabled) {
162+
explicitlyEnabledFailureStoreCount.incrementAndGet();
163+
effectivelyEnabledFailureStoreCount.incrementAndGet();
164+
}
165+
DataStreamLifecycle lifecycle = randomBoolean()
166+
? null
167+
: new DataStreamLifecycle(randomBoolean(), TimeValue.timeValueDays(randomIntBetween(1, 10)), null);
168+
if (lifecycle != null && lifecycle.enabled()) {
169+
explicitlyEnabledFailuresLifecycleCount.incrementAndGet();
170+
effectivelyEnabledFailuresLifecycleCount.incrementAndGet();
171+
if (lifecycle.dataRetention() != null) {
172+
long retentionMillis = lifecycle.dataRetention().getMillis();
173+
totalRetentionTimes.addAndGet(retentionMillis);
174+
failuresLifecycleWithRetention.incrementAndGet();
175+
if (retentionMillis < minRetention.get()) {
176+
minRetention.set(retentionMillis);
177+
}
178+
if (retentionMillis > maxRetention.get()) {
179+
maxRetention.set(retentionMillis);
180+
}
181+
}
182+
}
183+
if (lifecycle == null
184+
&& (enabledBySetting || Boolean.TRUE.equals(failureStoreEnabled) || failureIndices.isEmpty() == false)) {
185+
effectivelyEnabledFailuresLifecycleCount.incrementAndGet();
186+
if (useDefaultRetention && systemDataStream == false) {
187+
failuresLifecycleWithDefaultRetention.incrementAndGet();
188+
}
189+
}
190+
DataStream dataStream = new DataStream(
191+
enabledBySetting ? "mis-" + randomAlphaOfLength(10) : randomAlphaOfLength(50),
192+
backingIndices,
193+
randomLongBetween(0, 1000),
194+
Map.of(),
195+
systemDataStream || randomBoolean(),
196+
replicated,
197+
systemDataStream,
198+
randomBoolean(),
199+
IndexMode.STANDARD,
200+
null,
201+
failureStoreEnabled == null && lifecycle == null
202+
? DataStreamOptions.EMPTY
203+
: new DataStreamOptions(new DataStreamFailureStore(failureStoreEnabled, lifecycle)),
204+
failureIndices,
205+
replicated == false && randomBoolean(),
206+
null
207+
);
208+
dataStreamMap.put(dataStream.getName(), dataStream);
209+
}
210+
metadataBuilder.dataStreams(dataStreamMap, Map.of());
211+
ClusterState.Builder clusterStateBuilder = new ClusterState.Builder(clusterState);
212+
clusterStateBuilder.metadata(metadataBuilder);
213+
return clusterStateBuilder.build();
214+
});
215+
216+
int retainedDataStreams = failuresLifecycleWithRetention.get() + failuresLifecycleWithDefaultRetention.get();
217+
218+
int expectedMinimumDataRetention = minRetention.get() == Long.MAX_VALUE ? 0 : minRetention.intValue();
219+
int expectedMinimumEffectiveRetention = failuresLifecycleWithDefaultRetention.get() > 0
220+
? (int) Math.min(minRetention.get(), defaultRetention.getMillis())
221+
: expectedMinimumDataRetention;
222+
223+
int expectedMaximumDataRetention = maxRetention.get() == Long.MIN_VALUE ? 0 : maxRetention.intValue();
224+
int expectedMaximumEffectiveRetention = failuresLifecycleWithDefaultRetention.get() > 0
225+
? (int) Math.max(maxRetention.get(), defaultRetention.getMillis())
226+
: expectedMaximumDataRetention;
227+
228+
double expectedAverageDataRetention = failuresLifecycleWithRetention.get() == 0
229+
? 0.0
230+
: totalRetentionTimes.doubleValue() / failuresLifecycleWithRetention.get();
231+
double expectedAverageEffectiveRetention = failuresLifecycleWithDefaultRetention.get() > 0
232+
? (totalRetentionTimes.doubleValue() + failuresLifecycleWithDefaultRetention.get() * defaultRetention.getMillis())
233+
/ retainedDataStreams
234+
: expectedAverageDataRetention;
235+
236+
Map<String, Object> map = getDataStreamUsage();
237+
assertThat(map.get("available"), equalTo(true));
238+
assertThat(map.get("enabled"), equalTo(true));
239+
assertThat(map.get("data_streams"), equalTo(dataStreamsCount));
240+
assertThat(map.get("indices_count"), equalTo(backingIndicesCount.get()));
241+
242+
Map<String, Object> failureStoreMap = (Map<String, Object>) map.get("failure_store");
243+
assertThat(failureStoreMap.get("explicitly_enabled_count"), equalTo(explicitlyEnabledFailureStoreCount.get()));
244+
assertThat(failureStoreMap.get("effectively_enabled_count"), equalTo(effectivelyEnabledFailureStoreCount.get()));
245+
assertThat(failureStoreMap.get("failure_indices_count"), equalTo(failureIndicesCount.get()));
246+
247+
Map<String, Object> failuresLifecycleMap = (Map<String, Object>) failureStoreMap.get("lifecycle");
248+
assertThat(failuresLifecycleMap.get("explicitly_enabled_count"), equalTo(explicitlyEnabledFailuresLifecycleCount.get()));
249+
assertThat(failuresLifecycleMap.get("effectively_enabled_count"), equalTo(effectivelyEnabledFailuresLifecycleCount.get()));
250+
251+
Map<String, Object> dataRetentionMap = (Map<String, Object>) failuresLifecycleMap.get("data_retention");
252+
assertThat(dataRetentionMap.get("configured_data_streams"), equalTo(failuresLifecycleWithRetention.get()));
253+
if (failuresLifecycleWithRetention.get() > 0) {
254+
assertThat(dataRetentionMap.get("minimum_millis"), equalTo(expectedMinimumDataRetention));
255+
assertThat(dataRetentionMap.get("maximum_millis"), equalTo(expectedMaximumDataRetention));
256+
assertThat(dataRetentionMap.get("average_millis"), equalTo(expectedAverageDataRetention));
257+
}
258+
259+
Map<String, Object> effectiveRetentionMap = (Map<String, Object>) failuresLifecycleMap.get("effective_retention");
260+
assertThat(effectiveRetentionMap.get("retained_data_streams"), equalTo(retainedDataStreams));
261+
if (retainedDataStreams > 0) {
262+
assertThat(effectiveRetentionMap.get("minimum_millis"), equalTo(expectedMinimumEffectiveRetention));
263+
assertThat(effectiveRetentionMap.get("maximum_millis"), equalTo(expectedMaximumEffectiveRetention));
264+
assertThat(effectiveRetentionMap.get("average_millis"), equalTo(expectedAverageEffectiveRetention));
265+
}
266+
267+
Map<String, Map<String, Object>> globalRetentionMap = (Map<String, Map<String, Object>>) failuresLifecycleMap.get(
268+
"global_retention"
269+
);
270+
assertThat(globalRetentionMap.get("max").get("defined"), equalTo(false));
271+
assertThat(globalRetentionMap.get("default").get("defined"), equalTo(useDefaultRetention));
272+
if (useDefaultRetention) {
273+
assertThat(
274+
globalRetentionMap.get("default").get("affected_data_streams"),
275+
equalTo(failuresLifecycleWithDefaultRetention.get())
276+
);
277+
assertThat(globalRetentionMap.get("default").get("retention_millis"), equalTo((int) defaultRetention.getMillis()));
278+
}
279+
}
280+
281+
private Map<String, Object> getDataStreamUsage() throws IOException {
282+
XPackUsageFeatureResponse response = safeGet(client().execute(DATA_STREAMS, new XPackUsageRequest(SAFE_AWAIT_TIMEOUT)));
283+
XContentBuilder builder = XContentFactory.jsonBuilder();
284+
builder = response.getUsage().toXContent(builder, ToXContent.EMPTY_PARAMS);
285+
Tuple<XContentType, Map<String, Object>> tuple = XContentHelper.convertToMap(
286+
BytesReference.bytes(builder),
287+
true,
288+
XContentType.JSON
289+
);
290+
return tuple.v2();
291+
}
292+
293+
/*
294+
* Updates the cluster state in the internal cluster using the provided function
295+
*/
296+
protected static void updateClusterState(final Function<ClusterState, ClusterState> updater) throws Exception {
297+
final PlainActionFuture<Void> future = new PlainActionFuture<>();
298+
final ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
299+
clusterService.submitUnbatchedStateUpdateTask("test", new ClusterStateUpdateTask() {
300+
@Override
301+
public ClusterState execute(ClusterState currentState) {
302+
return updater.apply(currentState);
303+
}
304+
305+
@Override
306+
public void onFailure(Exception e) {
307+
future.onFailure(e);
308+
}
309+
310+
@Override
311+
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
312+
future.onResponse(null);
313+
}
314+
});
315+
future.get();
316+
}
317+
318+
/*
319+
* This plugin exposes the DataStreamUsageTransportAction.
320+
*/
321+
public static final class TestDataStreamUsagePlugin extends XPackClientPlugin {
322+
@Override
323+
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
324+
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>();
325+
actions.add(new ActionHandler<>(DATA_STREAMS, DataStreamUsageTransportAction.class));
326+
return actions;
327+
}
328+
}
329+
}

0 commit comments

Comments
 (0)