From 2f024631cf74db7907374aec3f946486d23a5d06 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 5 Nov 2024 08:46:46 -0800 Subject: [PATCH] Add num docs and size to logsdb telemetry (#116128) Follow-up on #115994 to add telemetry for the total number of documents and size in bytes of logsdb indices. Relates #115994 --- docs/changelog/116128.yaml | 5 + .../monitor/metrics/IndicesMetricsIT.java | 16 ++ server/src/main/java/module-info.java | 1 + .../org/elasticsearch/TransportVersions.java | 1 + .../elasticsearch/action/ActionModule.java | 2 + .../metrics/IndexModeStatsActionType.java | 162 ++++++++++++++++++ .../monitor/metrics/IndexStats.java | 67 ++++++++ .../monitor/metrics/IndicesMetrics.java | 72 ++++---- .../xpack/core/XPackFeatures.java | 4 +- .../application/LogsDBFeatureSetUsage.java | 32 +++- .../logsdb/LogsDBUsageTransportAction.java | 37 +++- .../rest-api-spec/test/logsdb/10_usage.yml | 36 ++++ 12 files changed, 387 insertions(+), 48 deletions(-) create mode 100644 docs/changelog/116128.yaml create mode 100644 server/src/main/java/org/elasticsearch/monitor/metrics/IndexModeStatsActionType.java create mode 100644 server/src/main/java/org/elasticsearch/monitor/metrics/IndexStats.java diff --git a/docs/changelog/116128.yaml b/docs/changelog/116128.yaml new file mode 100644 index 0000000000000..7c38c0529c50d --- /dev/null +++ b/docs/changelog/116128.yaml @@ -0,0 +1,5 @@ +pr: 116128 +summary: Add num docs and size to logsdb telemetry +area: Logs +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java index ecbc23279f357..89cc8d08ed1e1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java @@ -11,9 +11,11 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.OnScriptError; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.indices.IndicesService; @@ -329,6 +331,10 @@ public void testIndicesMetrics() { equalTo(0L) ) ); + + verifyStatsPerIndexMode( + Map.of(IndexMode.STANDARD, numStandardDocs, IndexMode.LOGSDB, numLogsdbDocs, IndexMode.TIME_SERIES, numTimeSeriesDocs) + ); } void collectThenAssertMetrics(TestTelemetryPlugin telemetry, int times, Map> matchers) { @@ -434,6 +440,16 @@ int populateLogsdbIndices(long numIndices) { return totalDocs; } + private void verifyStatsPerIndexMode(Map expectedDocs) { + var nodes = clusterService().state().nodes().stream().toArray(DiscoveryNode[]::new); + var request = new IndexModeStatsActionType.StatsRequest(nodes); + var resp = client().execute(IndexModeStatsActionType.TYPE, request).actionGet(); + var stats = resp.stats(); + for (Map.Entry e : expectedDocs.entrySet()) { + assertThat(stats.get(e.getKey()).numDocs(), equalTo(e.getValue())); + } + } + private Map parseMapping(String mapping) throws IOException { try (XContentParser parser = createParser(JsonXContent.jsonXContent, mapping)) { return parser.map(); diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 378a00f4ec81a..fd90933055483 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -469,5 +469,6 @@ org.elasticsearch.serverless.apifiltering; exports org.elasticsearch.lucene.spatial; exports org.elasticsearch.inference.configuration; + exports org.elasticsearch.monitor.metrics; } diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 83d798a35975b..871e5aa48e3e4 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -187,6 +187,7 @@ static TransportVersion def(int id) { public static final TransportVersion QUERY_RULES_RETRIEVER = def(8_782_00_0); public static final TransportVersion ESQL_CCS_EXEC_INFO_WITH_FAILURES = def(8_783_00_0); public static final TransportVersion LOGSDB_TELEMETRY = def(8_784_00_0); + public static final TransportVersion LOGSDB_TELEMETRY_STATS = def(8_785_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 2d72f5d71ccda..eb0a005dc83ee 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -243,6 +243,7 @@ import org.elasticsearch.injection.guice.AbstractModule; import org.elasticsearch.injection.guice.TypeLiteral; import org.elasticsearch.injection.guice.multibindings.MapBinder; +import org.elasticsearch.monitor.metrics.IndexModeStatsActionType; import org.elasticsearch.persistent.CompletionPersistentTaskAction; import org.elasticsearch.persistent.RemovePersistentTaskAction; import org.elasticsearch.persistent.StartPersistentTaskAction; @@ -630,6 +631,7 @@ public void reg actions.register(TransportNodesFeaturesAction.TYPE, TransportNodesFeaturesAction.class); actions.register(RemoteClusterNodesAction.TYPE, RemoteClusterNodesAction.TransportAction.class); actions.register(TransportNodesStatsAction.TYPE, TransportNodesStatsAction.class); + actions.register(IndexModeStatsActionType.TYPE, IndexModeStatsActionType.TransportAction.class); actions.register(TransportNodesUsageAction.TYPE, TransportNodesUsageAction.class); actions.register(TransportNodesHotThreadsAction.TYPE, TransportNodesHotThreadsAction.class); actions.register(TransportListTasksAction.TYPE, TransportListTasksAction.class); diff --git a/server/src/main/java/org/elasticsearch/monitor/metrics/IndexModeStatsActionType.java b/server/src/main/java/org/elasticsearch/monitor/metrics/IndexModeStatsActionType.java new file mode 100644 index 0000000000000..45ba7b410ed30 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/monitor/metrics/IndexModeStatsActionType.java @@ -0,0 +1,162 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.monitor.metrics; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +public final class IndexModeStatsActionType extends ActionType { + public static final IndexModeStatsActionType TYPE = new IndexModeStatsActionType(); + + private IndexModeStatsActionType() { + super("cluster:monitor/nodes/index_mode_stats"); + } + + public static final class StatsRequest extends BaseNodesRequest { + public StatsRequest(String[] nodesIds) { + super(nodesIds); + } + + public StatsRequest(DiscoveryNode... concreteNodes) { + super(concreteNodes); + } + } + + public static final class StatsResponse extends BaseNodesResponse { + StatsResponse(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + assert false : "must be local"; + throw new UnsupportedOperationException("must be local"); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + assert false : "must be local"; + throw new UnsupportedOperationException("must be local"); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + assert false : "must be local"; + throw new UnsupportedOperationException("must be local"); + } + + public Map stats() { + final Map stats = new EnumMap<>(IndexMode.class); + for (IndexMode mode : IndexMode.values()) { + stats.put(mode, new IndexStats()); + } + for (NodeResponse node : getNodes()) { + for (Map.Entry e : node.stats.entrySet()) { + stats.get(e.getKey()).add(e.getValue()); + } + } + return stats; + } + } + + public static final class NodeRequest extends TransportRequest { + NodeRequest() { + + } + + NodeRequest(StreamInput in) throws IOException { + super(in); + } + } + + public static class NodeResponse extends BaseNodeResponse { + private final Map stats; + + NodeResponse(DiscoveryNode node, Map stats) { + super(node); + this.stats = stats; + } + + NodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + super(in, node); + stats = in.readMap(IndexMode::readFrom, IndexStats::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(stats, (o, m) -> IndexMode.writeTo(m, o), (o, s) -> s.writeTo(o)); + } + } + + public static class TransportAction extends TransportNodesAction { + private final IndicesService indicesService; + + @Inject + public TransportAction( + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + IndicesService indicesService + ) { + super( + TYPE.name(), + clusterService, + transportService, + actionFilters, + NodeRequest::new, + transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT) + ); + this.indicesService = indicesService; + } + + @Override + protected StatsResponse newResponse(StatsRequest request, List nodeResponses, List failures) { + return new StatsResponse(ClusterName.DEFAULT, nodeResponses, failures); + } + + @Override + protected NodeRequest newNodeRequest(StatsRequest request) { + return new NodeRequest(); + } + + @Override + protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new NodeResponse(in, node); + } + + @Override + protected NodeResponse nodeOperation(NodeRequest request, Task task) { + return new NodeResponse(clusterService.localNode(), IndicesMetrics.getStatsWithoutCache(indicesService)); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/monitor/metrics/IndexStats.java b/server/src/main/java/org/elasticsearch/monitor/metrics/IndexStats.java new file mode 100644 index 0000000000000..5d16150516465 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/monitor/metrics/IndexStats.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.monitor.metrics; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.search.stats.SearchStats; +import org.elasticsearch.index.shard.IndexingStats; + +import java.io.IOException; + +public final class IndexStats implements Writeable { + int numIndices = 0; + long numDocs = 0; + long numBytes = 0; + SearchStats.Stats search = new SearchStats().getTotal(); + IndexingStats.Stats indexing = new IndexingStats().getTotal(); + + IndexStats() { + + } + + IndexStats(StreamInput in) throws IOException { + this.numIndices = in.readVInt(); + this.numDocs = in.readVLong(); + this.numBytes = in.readVLong(); + this.search = SearchStats.Stats.readStats(in); + this.indexing = new IndexingStats.Stats(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(numIndices); + out.writeVLong(numDocs); + out.writeVLong(numBytes); + search.writeTo(out); + indexing.writeTo(out); + } + + void add(IndexStats other) { + this.numIndices += other.numIndices; + this.numDocs += other.numDocs; + this.numBytes += other.numBytes; + this.search.add(other.search); + this.indexing.add(other.indexing); + } + + public int numIndices() { + return numIndices; + } + + public long numDocs() { + return numDocs; + } + + public long numBytes() { + return numBytes; + } +} diff --git a/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java b/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java index ba67bc03e1441..99011d101d342 100644 --- a/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java +++ b/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java @@ -18,11 +18,9 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexingStats; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.telemetry.metric.LongWithAttributes; import org.elasticsearch.telemetry.metric.MeterRegistry; @@ -193,12 +191,36 @@ protected void doClose() throws IOException { }); } - static class IndexStats { - int numIndices = 0; - long numDocs = 0; - long numBytes = 0; - SearchStats.Stats search = new SearchStats().getTotal(); - IndexingStats.Stats indexing = new IndexingStats().getTotal(); + static Map getStatsWithoutCache(IndicesService indicesService) { + Map stats = new EnumMap<>(IndexMode.class); + for (IndexMode mode : IndexMode.values()) { + stats.put(mode, new IndexStats()); + } + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + if (indexShard.isSystem()) { + continue; // skip system indices + } + final ShardRouting shardRouting = indexShard.routingEntry(); + final IndexMode indexMode = indexShard.indexSettings().getMode(); + final IndexStats indexStats = stats.get(indexMode); + try { + if (shardRouting.primary() && shardRouting.recoverySource() == null) { + if (shardRouting.shardId().id() == 0) { + indexStats.numIndices++; + } + final DocsStats docStats = indexShard.docStats(); + indexStats.numDocs += docStats.getCount(); + indexStats.numBytes += docStats.getTotalSizeInBytes(); + indexStats.indexing.add(indexShard.indexingStats().getTotal()); + } + indexStats.search.add(indexShard.searchStats().getTotal()); + } catch (IllegalIndexShardStateException | AlreadyClosedException ignored) { + // ignored + } + } + } + return stats; } private static class IndicesStatsCache extends SingleObjectCache> { @@ -219,41 +241,9 @@ private static class IndicesStatsCache extends SingleObjectCache internalGetIndicesStats() { - Map stats = new EnumMap<>(IndexMode.class); - for (IndexMode mode : IndexMode.values()) { - stats.put(mode, new IndexStats()); - } - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - if (indexShard.isSystem()) { - continue; // skip system indices - } - final ShardRouting shardRouting = indexShard.routingEntry(); - final IndexMode indexMode = indexShard.indexSettings().getMode(); - final IndexStats indexStats = stats.get(indexMode); - try { - if (shardRouting.primary() && shardRouting.recoverySource() == null) { - if (shardRouting.shardId().id() == 0) { - indexStats.numIndices++; - } - final DocsStats docStats = indexShard.docStats(); - indexStats.numDocs += docStats.getCount(); - indexStats.numBytes += docStats.getTotalSizeInBytes(); - indexStats.indexing.add(indexShard.indexingStats().getTotal()); - } - indexStats.search.add(indexShard.searchStats().getTotal()); - } catch (IllegalIndexShardStateException | AlreadyClosedException ignored) { - // ignored - } - } - } - return stats; - } - @Override protected Map refresh() { - return refresh ? internalGetIndicesStats() : getNoRefresh(); + return refresh ? getStatsWithoutCache(indicesService) : getNoRefresh(); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackFeatures.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackFeatures.java index a7cf878511d78..b885a90c30e57 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackFeatures.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackFeatures.java @@ -21,13 +21,15 @@ */ public class XPackFeatures implements FeatureSpecification { public static final NodeFeature LOGSDB_TELEMETRY = new NodeFeature("logsdb_telemetry"); + public static final NodeFeature LOGSDB_TELMETRY_STATS = new NodeFeature("logsdb_telemetry_stats"); @Override public Set getFeatures() { return Set.of( NodesDataTiersUsageTransportAction.LOCALLY_PRECALCULATED_STATS_FEATURE, // Added in 8.12 License.INDEPENDENT_TRIAL_VERSION_FEATURE, // 8.14.0 - LOGSDB_TELEMETRY + LOGSDB_TELEMETRY, + LOGSDB_TELMETRY_STATS ); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/application/LogsDBFeatureSetUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/application/LogsDBFeatureSetUsage.java index a3473bf6224a1..2758ef73a98da 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/application/LogsDBFeatureSetUsage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/application/LogsDBFeatureSetUsage.java @@ -20,11 +20,20 @@ public final class LogsDBFeatureSetUsage extends XPackFeatureUsage { private final int indicesCount; private final int indicesWithSyntheticSource; + private final long numDocs; + private final long sizeInBytes; public LogsDBFeatureSetUsage(StreamInput input) throws IOException { super(input); indicesCount = input.readVInt(); indicesWithSyntheticSource = input.readVInt(); + if (input.getTransportVersion().onOrAfter(TransportVersions.LOGSDB_TELEMETRY_STATS)) { + numDocs = input.readVLong(); + sizeInBytes = input.readVLong(); + } else { + numDocs = 0; + sizeInBytes = 0; + } } @Override @@ -32,12 +41,25 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeVInt(indicesCount); out.writeVInt(indicesWithSyntheticSource); + if (out.getTransportVersion().onOrAfter(TransportVersions.LOGSDB_TELEMETRY_STATS)) { + out.writeVLong(numDocs); + out.writeVLong(sizeInBytes); + } } - public LogsDBFeatureSetUsage(boolean available, boolean enabled, int indicesCount, int indicesWithSyntheticSource) { + public LogsDBFeatureSetUsage( + boolean available, + boolean enabled, + int indicesCount, + int indicesWithSyntheticSource, + long numDocs, + long sizeInBytes + ) { super(XPackField.LOGSDB, available, enabled); this.indicesCount = indicesCount; this.indicesWithSyntheticSource = indicesWithSyntheticSource; + this.numDocs = numDocs; + this.sizeInBytes = sizeInBytes; } @Override @@ -50,11 +72,13 @@ protected void innerXContent(XContentBuilder builder, Params params) throws IOEx super.innerXContent(builder, params); builder.field("indices_count", indicesCount); builder.field("indices_with_synthetic_source", indicesWithSyntheticSource); + builder.field("num_docs", numDocs); + builder.field("size_in_bytes", sizeInBytes); } @Override public int hashCode() { - return Objects.hash(available, enabled, indicesCount, indicesWithSyntheticSource); + return Objects.hash(available, enabled, indicesCount, indicesWithSyntheticSource, numDocs, sizeInBytes); } @Override @@ -69,6 +93,8 @@ public boolean equals(Object obj) { return Objects.equals(available, other.available) && Objects.equals(enabled, other.enabled) && Objects.equals(indicesCount, other.indicesCount) - && Objects.equals(indicesWithSyntheticSource, other.indicesWithSyntheticSource); + && Objects.equals(indicesWithSyntheticSource, other.indicesWithSyntheticSource) + && Objects.equals(numDocs, other.numDocs) + && Objects.equals(sizeInBytes, other.sizeInBytes); } } diff --git a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBUsageTransportAction.java b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBUsageTransportAction.java index 5c385d5920428..62e1eef3e0e97 100644 --- a/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBUsageTransportAction.java +++ b/x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsDBUsageTransportAction.java @@ -8,17 +8,22 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.features.FeatureService; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.monitor.metrics.IndexModeStatsActionType; import org.elasticsearch.protocol.xpack.XPackUsageRequest; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.XPackFeatures; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureResponse; import org.elasticsearch.xpack.core.action.XPackUsageFeatureTransportAction; @@ -28,13 +33,17 @@ public class LogsDBUsageTransportAction extends XPackUsageFeatureTransportAction { private final ClusterService clusterService; + private final FeatureService featureService; + private final Client client; @Inject public LogsDBUsageTransportAction( TransportService transportService, ClusterService clusterService, + FeatureService featureService, ThreadPool threadPool, ActionFilters actionFilters, + Client client, IndexNameExpressionResolver indexNameExpressionResolver ) { super( @@ -46,6 +55,8 @@ public LogsDBUsageTransportAction( indexNameExpressionResolver ); this.clusterService = clusterService; + this.featureService = featureService; + this.client = client; } @Override @@ -66,8 +77,28 @@ protected void masterOperation( } } final boolean enabled = LogsDBPlugin.CLUSTER_LOGSDB_ENABLED.get(clusterService.getSettings()); - listener.onResponse( - new XPackUsageFeatureResponse(new LogsDBFeatureSetUsage(true, enabled, numIndices, numIndicesWithSyntheticSources)) - ); + if (featureService.clusterHasFeature(state, XPackFeatures.LOGSDB_TELMETRY_STATS)) { + final DiscoveryNode[] nodes = state.nodes().getDataNodes().values().toArray(DiscoveryNode[]::new); + final var statsRequest = new IndexModeStatsActionType.StatsRequest(nodes); + final int finalNumIndices = numIndices; + final int finalNumIndicesWithSyntheticSources = numIndicesWithSyntheticSources; + client.execute(IndexModeStatsActionType.TYPE, statsRequest, listener.map(statsResponse -> { + final var indexStats = statsResponse.stats().get(IndexMode.LOGSDB); + return new XPackUsageFeatureResponse( + new LogsDBFeatureSetUsage( + true, + enabled, + finalNumIndices, + finalNumIndicesWithSyntheticSources, + indexStats.numDocs(), + indexStats.numBytes() + ) + ); + })); + } else { + listener.onResponse( + new XPackUsageFeatureResponse(new LogsDBFeatureSetUsage(true, enabled, numIndices, numIndicesWithSyntheticSources, 0L, 0L)) + ); + } } } diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_usage.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_usage.yml index 63b9ba71510ed..731082378fe17 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_usage.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_usage.yml @@ -1,5 +1,8 @@ --- logsdb usage: + - requires: + cluster_features: ["logsdb_telemetry_stats"] + reason: "requires stats" - do: indices.create: index: test1 @@ -7,20 +10,43 @@ logsdb usage: settings: index: mode: logsdb + - do: + bulk: + index: test1 + refresh: true + body: + - { "index": { } } + - { "@timestamp": "2024-02-12T10:30:00Z", "host.name": "foo" } + - { "index": { } } + - { "@timestamp": "2024-02-12T10:31:00Z", "host.name": "bar" } - do: {xpack.usage: {}} - match: { logsdb.available: true } - match: { logsdb.indices_count: 1 } - match: { logsdb.indices_with_synthetic_source: 1 } + - match: { logsdb.num_docs: 2 } + - gt: { logsdb.size_in_bytes: 0} - do: indices.create: index: test2 + - do: + bulk: + index: test2 + refresh: true + body: + - { "index": { } } + - { "@timestamp": "2024-02-12T10:32:00Z", "host.name": "foo" } + - { "index": { } } + - { "@timestamp": "2024-02-12T10:33:00Z", "host.name": "baz" } + - do: {xpack.usage: {}} - match: { logsdb.available: true } - match: { logsdb.indices_count: 1 } - match: { logsdb.indices_with_synthetic_source: 1 } + - match: { logsdb.num_docs: 2 } + - gt: { logsdb.size_in_bytes: 0} - do: indices.create: @@ -31,7 +57,17 @@ logsdb usage: mode: logsdb mapping.source.mode: stored + - do: + bulk: + index: test3 + refresh: true + body: + - { "index": { } } + - { "@timestamp": "2024-02-12T10:32:00Z", "host.name": "foobar"} + - do: {xpack.usage: {}} - match: { logsdb.available: true } - match: { logsdb.indices_count: 2 } - match: { logsdb.indices_with_synthetic_source: 1 } + - match: { logsdb.num_docs: 3 } + - gt: { logsdb.size_in_bytes: 0}