Skip to content
6 changes: 6 additions & 0 deletions docs/changelog/134214.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 134214
summary: "[Downsampling++] Add time series telemetry in xpack usage"
area: Downsampling
type: enhancement
issues:
- 133953
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_API_DISABLE_EIS_RATE_LIMITING = def(9_152_0_00);
public static final TransportVersion GEMINI_THINKING_BUDGET_ADDED = def(9_153_0_00);
public static final TransportVersion VISIT_PERCENTAGE = def(9_154_0_00);
public static final TransportVersion TIME_SERIES_TELEMETRY = def(9_155_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public final class XPackField {
public static final String DATA_STREAMS = "data_streams";
/** Name constant for the data stream lifecycle feature. */
public static final String DATA_STREAM_LIFECYCLE = "data_lifecycle";
/** Name constant for the time series data streams feature. */
public static final String TIME_SERIES_DATA_STREAMS = "time_series";
/** Name constant for the data tiers feature. */
public static final String DATA_TIERS = "data_tiers";
/** Name constant for the aggregate_metric plugin. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.elasticsearch.xpack.core.action.DataStreamInfoTransportAction;
import org.elasticsearch.xpack.core.action.DataStreamLifecycleUsageTransportAction;
import org.elasticsearch.xpack.core.action.DataStreamUsageTransportAction;
import org.elasticsearch.xpack.core.action.TimeSeriesUsageTransportAction;
import org.elasticsearch.xpack.core.action.TransportXPackInfoAction;
import org.elasticsearch.xpack.core.action.TransportXPackUsageAction;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
Expand Down Expand Up @@ -374,6 +375,7 @@ public List<ActionHandler> getActions() {
actions.add(new ActionHandler(XPackUsageFeatureAction.HEALTH, HealthApiUsageTransportAction.class));
actions.add(new ActionHandler(XPackUsageFeatureAction.REMOTE_CLUSTERS, RemoteClusterUsageTransportAction.class));
actions.add(new ActionHandler(NodesDataTiersUsageTransportAction.TYPE, NodesDataTiersUsageTransportAction.class));
actions.add(new ActionHandler(XPackUsageFeatureAction.TIME_SERIES_DATA_STREAMS, TimeSeriesUsageTransportAction.class));
return actions;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.protocol.xpack.XPackUsageRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.datastreams.TimeSeriesFeatureSetUsage;
import org.elasticsearch.xpack.core.ilm.DownsampleAction;
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
import org.elasticsearch.xpack.core.ilm.Phase;

import java.util.HashMap;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Objects;

/**
* Exposes the time series telemetry via the xpack usage API. We track the following only for time series data streams:
* - time series data stream count
* - time series backing indices of these time series data streams
* - the feature that downsamples the time series data streams, we use the write index to avoid resolving templates,
* this might cause a small delay in the counters (backing indices, downsampling rounds).
* - For ILM specifically, we count the phases that have configured downsampling in the policies used in the time series data streams.
* - When elasticsearch is running in DLM only mode, we skip all the ILM metrics.
*/
public class TimeSeriesUsageTransportAction extends XPackUsageFeatureTransportAction {

private final ProjectResolver projectResolver;
private final boolean ilmAvailable;

@Inject
public TimeSeriesUsageTransportAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(XPackUsageFeatureAction.TIME_SERIES_DATA_STREAMS.name(), transportService, clusterService, threadPool, actionFilters);
this.projectResolver = projectResolver;
this.ilmAvailable = DataStreamLifecycle.isDataStreamsLifecycleOnlyMode(clusterService.getSettings()) == false;
}

@Override
protected void localClusterStateOperation(
Task task,
XPackUsageRequest request,
ClusterState state,
ActionListener<XPackUsageFeatureResponse> listener
) {
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(state);
IndexLifecycleMetadata ilmMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
final Map<String, DataStream> dataStreams = projectMetadata.dataStreams();

long tsDataStreamCount = 0;
long tsIndexCount = 0;
IlmDownsamplingStatsTracker ilmStats = ilmAvailable ? new IlmDownsamplingStatsTracker() : null;
DownsamplingStatsTracker dlmStats = new DownsamplingStatsTracker();
Map<String, Long> indicesByInterval = new HashMap<>();

for (DataStream ds : dataStreams.values()) {
// We choose to not count time series backing indices that do not belong to a time series data stream.
if (ds.getIndexMode() != IndexMode.TIME_SERIES) {
continue;
}
tsDataStreamCount++;
Integer dlmRounds = ds.getDataLifecycle() == null || ds.getDataLifecycle().downsampling() == null
? null
: ds.getDataLifecycle().downsampling().size();

for (Index backingIndex : ds.getIndices()) {
IndexMetadata indexMetadata = projectMetadata.index(backingIndex);
if (indexMetadata.getIndexMode() != IndexMode.TIME_SERIES) {
continue;
}
tsIndexCount++;
if (ds.isIndexManagedByDataStreamLifecycle(indexMetadata.getIndex(), ignored -> indexMetadata) && dlmRounds != null) {
dlmStats.trackIndex(ds, indexMetadata);
dlmStats.trackRounds(dlmRounds, ds, indexMetadata);
} else if (ilmAvailable && projectMetadata.isIndexManagedByILM(indexMetadata)) {
LifecyclePolicyMetadata policyMetadata = ilmMetadata.getPolicyMetadatas().get(indexMetadata.getLifecyclePolicyName());
if (policyMetadata == null) {
continue;
}
int rounds = 0;
for (Phase phase : policyMetadata.getPolicy().getPhases().values()) {
if (phase.getActions().containsKey(DownsampleAction.NAME)) {
rounds++;
}
}
if (rounds > 0) {
ilmStats.trackPolicy(policyMetadata.getPolicy());
ilmStats.trackIndex(ds, indexMetadata);
ilmStats.trackRounds(rounds, ds, indexMetadata);
}
}
String interval = indexMetadata.getSettings().get(IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL.getKey());
if (interval != null) {
Long count = indicesByInterval.computeIfAbsent(interval, ignored -> 0L);
indicesByInterval.put(interval, count + 1);
}
}
}

final TimeSeriesFeatureSetUsage usage = ilmAvailable
? new TimeSeriesFeatureSetUsage(
tsDataStreamCount,
tsIndexCount,
ilmStats.getDownsamplingStats(),
ilmStats.getIlmPolicyStats(),
dlmStats.getDownsamplingStats(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why dlmStats?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean why I picked this variable name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this call is for ilmStats, surprised to see dlmStats for this arg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. There are two different constructors one that accepts stats for both ILM and DLM and one that only accepts DLM for the serverless use case, this was syntactic sugar to make more explicit that ILM stats would be null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a comment for clarity

indicesByInterval
)
: new TimeSeriesFeatureSetUsage(tsDataStreamCount, tsIndexCount, dlmStats.getDownsamplingStats(), indicesByInterval);
listener.onResponse(new XPackUsageFeatureResponse(usage));
}

private static class DownsamplingStatsTracker {
private long downsampledDataStreams = 0;
private long downsampledIndices = 0;
private final LongSummaryStatistics rounds = new LongSummaryStatistics();

void trackIndex(DataStream ds, IndexMetadata indexMetadata) {
if (Objects.equals(indexMetadata.getIndex(), ds.getWriteIndex())) {
downsampledDataStreams++;
}
downsampledIndices++;
}

void trackRounds(int rounds, DataStream ds, IndexMetadata indexMetadata) {
// We want to track rounds per data stream, so we use the write index to determine the
// rounds applicable for this data stream
if (Objects.equals(indexMetadata.getIndex(), ds.getWriteIndex())) {
this.rounds.accept(rounds);
}
}

TimeSeriesFeatureSetUsage.DownsamplingFeatureStats getDownsamplingStats() {
return new TimeSeriesFeatureSetUsage.DownsamplingFeatureStats(
downsampledDataStreams,
downsampledIndices,
rounds.getMin(),
rounds.getAverage(),
rounds.getMax()
);
}
}

static class IlmDownsamplingStatsTracker extends DownsamplingStatsTracker {
private final Map<String, Map<String, Phase>> policies = new HashMap<>();

void trackPolicy(LifecyclePolicy ilmPolicy) {
policies.putIfAbsent(ilmPolicy.getName(), ilmPolicy.getPhases());
}

Map<String, Long> getIlmPolicyStats() {
if (policies.isEmpty()) {
return Map.of();
}
Map<String, Long> downsamplingPhases = new HashMap<>();
for (String ilmPolicy : policies.keySet()) {
for (Phase phase : policies.get(ilmPolicy).values()) {
if (phase.getActions().containsKey(DownsampleAction.NAME)) {
Long current = downsamplingPhases.computeIfAbsent(phase.getName(), ignored -> 0L);
downsamplingPhases.put(phase.getName(), current + 1);
}
}
}
return downsamplingPhases;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ private XPackUsageFeatureAction() {/* no instances */}
public static final ActionType<XPackUsageFeatureResponse> DATA_STREAM_LIFECYCLE = xpackUsageFeatureAction(
XPackField.DATA_STREAM_LIFECYCLE
);
public static final ActionType<XPackUsageFeatureResponse> TIME_SERIES_DATA_STREAMS = xpackUsageFeatureAction(
XPackField.TIME_SERIES_DATA_STREAMS
);
public static final ActionType<XPackUsageFeatureResponse> DATA_TIERS = xpackUsageFeatureAction(XPackField.DATA_TIERS);
public static final ActionType<XPackUsageFeatureResponse> AGGREGATE_METRIC = xpackUsageFeatureAction(XPackField.AGGREGATE_METRIC);
public static final ActionType<XPackUsageFeatureResponse> ARCHIVE = xpackUsageFeatureAction(XPackField.ARCHIVE);
Expand Down Expand Up @@ -91,7 +94,8 @@ private XPackUsageFeatureAction() {/* no instances */}
REMOTE_CLUSTERS,
ENTERPRISE_SEARCH,
UNIVERSAL_PROFILING,
LOGSDB
LOGSDB,
TIME_SERIES_DATA_STREAMS
);

public static ActionType<XPackUsageFeatureResponse> xpackUsageFeatureAction(String suffix) {
Expand Down
Loading