Skip to content

Commit 5374c33

Browse files
authored
[Downsampling++] Add time series telemetry in xpack usage (#134214)
In this PR we introduce telemetry for time series trying to answer the following questions: - How many time series data streams (tsds) does a cluster have? - How many time series indices do they have? - How many tsds are downsampled by ILM? - How many downsampling rounds are being used with ILM? - How many tsds are downsampled by DLM? - How many downsampling rounds are being used with DLM? - How are the numbers differ between serverless and stateful? - Which ILM phase is most commonly used for downsampling? Fixes: #133953
1 parent 2e8fbe9 commit 5374c33

File tree

10 files changed

+1129
-1
lines changed

10 files changed

+1129
-1
lines changed

docs/changelog/134214.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 134214
2+
summary: "[Downsampling++] Add time series telemetry in xpack usage"
3+
area: Downsampling
4+
type: enhancement
5+
issues:
6+
- 133953

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ static TransportVersion def(int id) {
344344
public static final TransportVersion INFERENCE_API_DISABLE_EIS_RATE_LIMITING = def(9_152_0_00);
345345
public static final TransportVersion GEMINI_THINKING_BUDGET_ADDED = def(9_153_0_00);
346346
public static final TransportVersion VISIT_PERCENTAGE = def(9_154_0_00);
347+
public static final TransportVersion TIME_SERIES_TELEMETRY = def(9_155_0_00);
347348

348349
/*
349350
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/TimeSeriesUsageTransportActionIT.java

Lines changed: 533 additions & 0 deletions
Large diffs are not rendered by default.

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public final class XPackField {
7474
public static final String DATA_STREAMS = "data_streams";
7575
/** Name constant for the data stream lifecycle feature. */
7676
public static final String DATA_STREAM_LIFECYCLE = "data_lifecycle";
77+
/** Name constant for the time series data streams feature. */
78+
public static final String TIME_SERIES_DATA_STREAMS = "time_series";
7779
/** Name constant for the data tiers feature. */
7880
public static final String DATA_TIERS = "data_tiers";
7981
/** Name constant for the aggregate_metric plugin. */

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackPlugin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.elasticsearch.xpack.core.action.DataStreamInfoTransportAction;
9292
import org.elasticsearch.xpack.core.action.DataStreamLifecycleUsageTransportAction;
9393
import org.elasticsearch.xpack.core.action.DataStreamUsageTransportAction;
94+
import org.elasticsearch.xpack.core.action.TimeSeriesUsageTransportAction;
9495
import org.elasticsearch.xpack.core.action.TransportXPackInfoAction;
9596
import org.elasticsearch.xpack.core.action.TransportXPackUsageAction;
9697
import org.elasticsearch.xpack.core.action.XPackInfoAction;
@@ -374,6 +375,7 @@ public List<ActionHandler> getActions() {
374375
actions.add(new ActionHandler(XPackUsageFeatureAction.HEALTH, HealthApiUsageTransportAction.class));
375376
actions.add(new ActionHandler(XPackUsageFeatureAction.REMOTE_CLUSTERS, RemoteClusterUsageTransportAction.class));
376377
actions.add(new ActionHandler(NodesDataTiersUsageTransportAction.TYPE, NodesDataTiersUsageTransportAction.class));
378+
actions.add(new ActionHandler(XPackUsageFeatureAction.TIME_SERIES_DATA_STREAMS, TimeSeriesUsageTransportAction.class));
377379
return actions;
378380
}
379381

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.action;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.support.ActionFilters;
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.metadata.DataStream;
14+
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
15+
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
17+
import org.elasticsearch.cluster.project.ProjectResolver;
18+
import org.elasticsearch.cluster.service.ClusterService;
19+
import org.elasticsearch.index.Index;
20+
import org.elasticsearch.index.IndexMode;
21+
import org.elasticsearch.injection.guice.Inject;
22+
import org.elasticsearch.protocol.xpack.XPackUsageRequest;
23+
import org.elasticsearch.tasks.Task;
24+
import org.elasticsearch.threadpool.ThreadPool;
25+
import org.elasticsearch.transport.TransportService;
26+
import org.elasticsearch.xpack.core.datastreams.TimeSeriesFeatureSetUsage;
27+
import org.elasticsearch.xpack.core.ilm.DownsampleAction;
28+
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
29+
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
30+
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
31+
import org.elasticsearch.xpack.core.ilm.Phase;
32+
33+
import java.util.HashMap;
34+
import java.util.LongSummaryStatistics;
35+
import java.util.Map;
36+
import java.util.Objects;
37+
38+
/**
39+
* Exposes the time series telemetry via the xpack usage API. We track the following only for time series data streams:
40+
* - time series data stream count
41+
* - time series backing indices of these time series data streams
42+
* - the feature that downsamples the time series data streams, we use the write index to avoid resolving templates,
43+
* this might cause a small delay in the counters (backing indices, downsampling rounds).
44+
* - For ILM specifically, we count the phases that have configured downsampling in the policies used in the time series data streams.
45+
* - When elasticsearch is running in DLM only mode, we skip all the ILM metrics.
46+
*/
47+
public class TimeSeriesUsageTransportAction extends XPackUsageFeatureTransportAction {
48+
49+
private final ProjectResolver projectResolver;
50+
private final boolean ilmAvailable;
51+
52+
@Inject
53+
public TimeSeriesUsageTransportAction(
54+
TransportService transportService,
55+
ClusterService clusterService,
56+
ThreadPool threadPool,
57+
ActionFilters actionFilters,
58+
ProjectResolver projectResolver
59+
) {
60+
super(XPackUsageFeatureAction.TIME_SERIES_DATA_STREAMS.name(), transportService, clusterService, threadPool, actionFilters);
61+
this.projectResolver = projectResolver;
62+
this.ilmAvailable = DataStreamLifecycle.isDataStreamsLifecycleOnlyMode(clusterService.getSettings()) == false;
63+
}
64+
65+
@Override
66+
protected void localClusterStateOperation(
67+
Task task,
68+
XPackUsageRequest request,
69+
ClusterState state,
70+
ActionListener<XPackUsageFeatureResponse> listener
71+
) {
72+
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(state);
73+
IndexLifecycleMetadata ilmMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
74+
final Map<String, DataStream> dataStreams = projectMetadata.dataStreams();
75+
76+
long tsDataStreamCount = 0;
77+
long tsIndexCount = 0;
78+
IlmDownsamplingStatsTracker ilmStats = ilmAvailable ? new IlmDownsamplingStatsTracker() : null;
79+
DownsamplingStatsTracker dlmStats = new DownsamplingStatsTracker();
80+
Map<String, Long> indicesByInterval = new HashMap<>();
81+
82+
for (DataStream ds : dataStreams.values()) {
83+
// We choose to not count time series backing indices that do not belong to a time series data stream.
84+
if (ds.getIndexMode() != IndexMode.TIME_SERIES) {
85+
continue;
86+
}
87+
tsDataStreamCount++;
88+
Integer dlmRounds = ds.getDataLifecycle() == null || ds.getDataLifecycle().downsampling() == null
89+
? null
90+
: ds.getDataLifecycle().downsampling().size();
91+
92+
for (Index backingIndex : ds.getIndices()) {
93+
IndexMetadata indexMetadata = projectMetadata.index(backingIndex);
94+
if (indexMetadata.getIndexMode() != IndexMode.TIME_SERIES) {
95+
continue;
96+
}
97+
tsIndexCount++;
98+
if (ds.isIndexManagedByDataStreamLifecycle(indexMetadata.getIndex(), ignored -> indexMetadata) && dlmRounds != null) {
99+
dlmStats.trackIndex(ds, indexMetadata);
100+
dlmStats.trackRounds(dlmRounds, ds, indexMetadata);
101+
} else if (ilmAvailable && projectMetadata.isIndexManagedByILM(indexMetadata)) {
102+
LifecyclePolicyMetadata policyMetadata = ilmMetadata.getPolicyMetadatas().get(indexMetadata.getLifecyclePolicyName());
103+
if (policyMetadata == null) {
104+
continue;
105+
}
106+
int rounds = 0;
107+
for (Phase phase : policyMetadata.getPolicy().getPhases().values()) {
108+
if (phase.getActions().containsKey(DownsampleAction.NAME)) {
109+
rounds++;
110+
}
111+
}
112+
if (rounds > 0) {
113+
ilmStats.trackPolicy(policyMetadata.getPolicy());
114+
ilmStats.trackIndex(ds, indexMetadata);
115+
ilmStats.trackRounds(rounds, ds, indexMetadata);
116+
}
117+
}
118+
String interval = indexMetadata.getSettings().get(IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL.getKey());
119+
if (interval != null) {
120+
Long count = indicesByInterval.computeIfAbsent(interval, ignored -> 0L);
121+
indicesByInterval.put(interval, count + 1);
122+
}
123+
}
124+
}
125+
126+
final TimeSeriesFeatureSetUsage usage = ilmAvailable
127+
? new TimeSeriesFeatureSetUsage(
128+
tsDataStreamCount,
129+
tsIndexCount,
130+
ilmStats.getDownsamplingStats(),
131+
ilmStats.getIlmPolicyStats(),
132+
dlmStats.getDownsamplingStats(),
133+
indicesByInterval
134+
)
135+
: new TimeSeriesFeatureSetUsage(tsDataStreamCount, tsIndexCount, dlmStats.getDownsamplingStats(), indicesByInterval);
136+
listener.onResponse(new XPackUsageFeatureResponse(usage));
137+
}
138+
139+
private static class DownsamplingStatsTracker {
140+
private long downsampledDataStreams = 0;
141+
private long downsampledIndices = 0;
142+
private final LongSummaryStatistics rounds = new LongSummaryStatistics();
143+
144+
void trackIndex(DataStream ds, IndexMetadata indexMetadata) {
145+
if (Objects.equals(indexMetadata.getIndex(), ds.getWriteIndex())) {
146+
downsampledDataStreams++;
147+
}
148+
downsampledIndices++;
149+
}
150+
151+
void trackRounds(int rounds, DataStream ds, IndexMetadata indexMetadata) {
152+
// We want to track rounds per data stream, so we use the write index to determine the
153+
// rounds applicable for this data stream
154+
if (Objects.equals(indexMetadata.getIndex(), ds.getWriteIndex())) {
155+
this.rounds.accept(rounds);
156+
}
157+
}
158+
159+
TimeSeriesFeatureSetUsage.DownsamplingFeatureStats getDownsamplingStats() {
160+
return new TimeSeriesFeatureSetUsage.DownsamplingFeatureStats(
161+
downsampledDataStreams,
162+
downsampledIndices,
163+
rounds.getMin(),
164+
rounds.getAverage(),
165+
rounds.getMax()
166+
);
167+
}
168+
}
169+
170+
static class IlmDownsamplingStatsTracker extends DownsamplingStatsTracker {
171+
private final Map<String, Map<String, Phase>> policies = new HashMap<>();
172+
173+
void trackPolicy(LifecyclePolicy ilmPolicy) {
174+
policies.putIfAbsent(ilmPolicy.getName(), ilmPolicy.getPhases());
175+
}
176+
177+
Map<String, Long> getIlmPolicyStats() {
178+
if (policies.isEmpty()) {
179+
return Map.of();
180+
}
181+
Map<String, Long> downsamplingPhases = new HashMap<>();
182+
for (String ilmPolicy : policies.keySet()) {
183+
for (Phase phase : policies.get(ilmPolicy).values()) {
184+
if (phase.getActions().containsKey(DownsampleAction.NAME)) {
185+
Long current = downsamplingPhases.computeIfAbsent(phase.getName(), ignored -> 0L);
186+
downsamplingPhases.put(phase.getName(), current + 1);
187+
}
188+
}
189+
}
190+
return downsamplingPhases;
191+
}
192+
}
193+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/XPackUsageFeatureAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ private XPackUsageFeatureAction() {/* no instances */}
5353
public static final ActionType<XPackUsageFeatureResponse> DATA_STREAM_LIFECYCLE = xpackUsageFeatureAction(
5454
XPackField.DATA_STREAM_LIFECYCLE
5555
);
56+
public static final ActionType<XPackUsageFeatureResponse> TIME_SERIES_DATA_STREAMS = xpackUsageFeatureAction(
57+
XPackField.TIME_SERIES_DATA_STREAMS
58+
);
5659
public static final ActionType<XPackUsageFeatureResponse> DATA_TIERS = xpackUsageFeatureAction(XPackField.DATA_TIERS);
5760
public static final ActionType<XPackUsageFeatureResponse> AGGREGATE_METRIC = xpackUsageFeatureAction(XPackField.AGGREGATE_METRIC);
5861
public static final ActionType<XPackUsageFeatureResponse> ARCHIVE = xpackUsageFeatureAction(XPackField.ARCHIVE);
@@ -91,7 +94,8 @@ private XPackUsageFeatureAction() {/* no instances */}
9194
REMOTE_CLUSTERS,
9295
ENTERPRISE_SEARCH,
9396
UNIVERSAL_PROFILING,
94-
LOGSDB
97+
LOGSDB,
98+
TIME_SERIES_DATA_STREAMS
9599
);
96100

97101
public static ActionType<XPackUsageFeatureResponse> xpackUsageFeatureAction(String suffix) {

0 commit comments

Comments
 (0)