Skip to content

Commit fa6c529

Browse files
authored
Add num docs and size to logsdb telemetry (#116128)
Follow-up on #115994 to add telemetry for the total number of documents and size in bytes of logsdb indices. Relates #115994
1 parent bcc87b7 commit fa6c529

File tree

12 files changed

+387
-48
lines changed

12 files changed

+387
-48
lines changed

docs/changelog/116128.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 116128
2+
summary: Add num docs and size to logsdb telemetry
3+
area: Logs
4+
type: enhancement
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@
1111

1212
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.cluster.node.DiscoveryNode;
1415
import org.elasticsearch.common.settings.Setting;
1516
import org.elasticsearch.common.settings.Settings;
1617
import org.elasticsearch.core.TimeValue;
18+
import org.elasticsearch.index.IndexMode;
1719
import org.elasticsearch.index.mapper.OnScriptError;
1820
import org.elasticsearch.index.query.RangeQueryBuilder;
1921
import org.elasticsearch.indices.IndicesService;
@@ -329,6 +331,10 @@ public void testIndicesMetrics() {
329331
equalTo(0L)
330332
)
331333
);
334+
335+
verifyStatsPerIndexMode(
336+
Map.of(IndexMode.STANDARD, numStandardDocs, IndexMode.LOGSDB, numLogsdbDocs, IndexMode.TIME_SERIES, numTimeSeriesDocs)
337+
);
332338
}
333339

334340
void collectThenAssertMetrics(TestTelemetryPlugin telemetry, int times, Map<String, Matcher<Long>> matchers) {
@@ -434,6 +440,16 @@ int populateLogsdbIndices(long numIndices) {
434440
return totalDocs;
435441
}
436442

443+
private void verifyStatsPerIndexMode(Map<IndexMode, Long> expectedDocs) {
444+
var nodes = clusterService().state().nodes().stream().toArray(DiscoveryNode[]::new);
445+
var request = new IndexModeStatsActionType.StatsRequest(nodes);
446+
var resp = client().execute(IndexModeStatsActionType.TYPE, request).actionGet();
447+
var stats = resp.stats();
448+
for (Map.Entry<IndexMode, Long> e : expectedDocs.entrySet()) {
449+
assertThat(stats.get(e.getKey()).numDocs(), equalTo(e.getValue()));
450+
}
451+
}
452+
437453
private Map<String, Object> parseMapping(String mapping) throws IOException {
438454
try (XContentParser parser = createParser(JsonXContent.jsonXContent, mapping)) {
439455
return parser.map();

server/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,5 +470,6 @@
470470
org.elasticsearch.serverless.apifiltering;
471471
exports org.elasticsearch.lucene.spatial;
472472
exports org.elasticsearch.inference.configuration;
473+
exports org.elasticsearch.monitor.metrics;
473474

474475
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ static TransportVersion def(int id) {
187187
public static final TransportVersion QUERY_RULES_RETRIEVER = def(8_782_00_0);
188188
public static final TransportVersion ESQL_CCS_EXEC_INFO_WITH_FAILURES = def(8_783_00_0);
189189
public static final TransportVersion LOGSDB_TELEMETRY = def(8_784_00_0);
190+
public static final TransportVersion LOGSDB_TELEMETRY_STATS = def(8_785_00_0);
190191

191192
/*
192193
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@
243243
import org.elasticsearch.injection.guice.AbstractModule;
244244
import org.elasticsearch.injection.guice.TypeLiteral;
245245
import org.elasticsearch.injection.guice.multibindings.MapBinder;
246+
import org.elasticsearch.monitor.metrics.IndexModeStatsActionType;
246247
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
247248
import org.elasticsearch.persistent.RemovePersistentTaskAction;
248249
import org.elasticsearch.persistent.StartPersistentTaskAction;
@@ -628,6 +629,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
628629
actions.register(TransportNodesFeaturesAction.TYPE, TransportNodesFeaturesAction.class);
629630
actions.register(RemoteClusterNodesAction.TYPE, RemoteClusterNodesAction.TransportAction.class);
630631
actions.register(TransportNodesStatsAction.TYPE, TransportNodesStatsAction.class);
632+
actions.register(IndexModeStatsActionType.TYPE, IndexModeStatsActionType.TransportAction.class);
631633
actions.register(TransportNodesUsageAction.TYPE, TransportNodesUsageAction.class);
632634
actions.register(TransportNodesHotThreadsAction.TYPE, TransportNodesHotThreadsAction.class);
633635
actions.register(TransportListTasksAction.TYPE, TransportListTasksAction.class);
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.monitor.metrics;
11+
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.FailedNodeException;
14+
import org.elasticsearch.action.support.ActionFilters;
15+
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
16+
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
17+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
18+
import org.elasticsearch.action.support.nodes.TransportNodesAction;
19+
import org.elasticsearch.cluster.ClusterName;
20+
import org.elasticsearch.cluster.node.DiscoveryNode;
21+
import org.elasticsearch.cluster.service.ClusterService;
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.index.IndexMode;
25+
import org.elasticsearch.indices.IndicesService;
26+
import org.elasticsearch.injection.guice.Inject;
27+
import org.elasticsearch.tasks.Task;
28+
import org.elasticsearch.threadpool.ThreadPool;
29+
import org.elasticsearch.transport.TransportRequest;
30+
import org.elasticsearch.transport.TransportService;
31+
32+
import java.io.IOException;
33+
import java.util.EnumMap;
34+
import java.util.List;
35+
import java.util.Map;
36+
37+
public final class IndexModeStatsActionType extends ActionType<IndexModeStatsActionType.StatsResponse> {
38+
public static final IndexModeStatsActionType TYPE = new IndexModeStatsActionType();
39+
40+
private IndexModeStatsActionType() {
41+
super("cluster:monitor/nodes/index_mode_stats");
42+
}
43+
44+
public static final class StatsRequest extends BaseNodesRequest {
45+
public StatsRequest(String[] nodesIds) {
46+
super(nodesIds);
47+
}
48+
49+
public StatsRequest(DiscoveryNode... concreteNodes) {
50+
super(concreteNodes);
51+
}
52+
}
53+
54+
public static final class StatsResponse extends BaseNodesResponse<NodeResponse> {
55+
StatsResponse(ClusterName clusterName, List<NodeResponse> nodes, List<FailedNodeException> failures) {
56+
super(clusterName, nodes, failures);
57+
}
58+
59+
@Override
60+
public void writeTo(StreamOutput out) throws IOException {
61+
assert false : "must be local";
62+
throw new UnsupportedOperationException("must be local");
63+
}
64+
65+
@Override
66+
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
67+
assert false : "must be local";
68+
throw new UnsupportedOperationException("must be local");
69+
}
70+
71+
@Override
72+
protected void writeNodesTo(StreamOutput out, List<NodeResponse> nodes) throws IOException {
73+
assert false : "must be local";
74+
throw new UnsupportedOperationException("must be local");
75+
}
76+
77+
public Map<IndexMode, IndexStats> stats() {
78+
final Map<IndexMode, IndexStats> stats = new EnumMap<>(IndexMode.class);
79+
for (IndexMode mode : IndexMode.values()) {
80+
stats.put(mode, new IndexStats());
81+
}
82+
for (NodeResponse node : getNodes()) {
83+
for (Map.Entry<IndexMode, IndexStats> e : node.stats.entrySet()) {
84+
stats.get(e.getKey()).add(e.getValue());
85+
}
86+
}
87+
return stats;
88+
}
89+
}
90+
91+
public static final class NodeRequest extends TransportRequest {
92+
NodeRequest() {
93+
94+
}
95+
96+
NodeRequest(StreamInput in) throws IOException {
97+
super(in);
98+
}
99+
}
100+
101+
public static class NodeResponse extends BaseNodeResponse {
102+
private final Map<IndexMode, IndexStats> stats;
103+
104+
NodeResponse(DiscoveryNode node, Map<IndexMode, IndexStats> stats) {
105+
super(node);
106+
this.stats = stats;
107+
}
108+
109+
NodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
110+
super(in, node);
111+
stats = in.readMap(IndexMode::readFrom, IndexStats::new);
112+
}
113+
114+
@Override
115+
public void writeTo(StreamOutput out) throws IOException {
116+
super.writeTo(out);
117+
out.writeMap(stats, (o, m) -> IndexMode.writeTo(m, o), (o, s) -> s.writeTo(o));
118+
}
119+
}
120+
121+
public static class TransportAction extends TransportNodesAction<StatsRequest, StatsResponse, NodeRequest, NodeResponse, Void> {
122+
private final IndicesService indicesService;
123+
124+
@Inject
125+
public TransportAction(
126+
ClusterService clusterService,
127+
TransportService transportService,
128+
ActionFilters actionFilters,
129+
IndicesService indicesService
130+
) {
131+
super(
132+
TYPE.name(),
133+
clusterService,
134+
transportService,
135+
actionFilters,
136+
NodeRequest::new,
137+
transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT)
138+
);
139+
this.indicesService = indicesService;
140+
}
141+
142+
@Override
143+
protected StatsResponse newResponse(StatsRequest request, List<NodeResponse> nodeResponses, List<FailedNodeException> failures) {
144+
return new StatsResponse(ClusterName.DEFAULT, nodeResponses, failures);
145+
}
146+
147+
@Override
148+
protected NodeRequest newNodeRequest(StatsRequest request) {
149+
return new NodeRequest();
150+
}
151+
152+
@Override
153+
protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
154+
return new NodeResponse(in, node);
155+
}
156+
157+
@Override
158+
protected NodeResponse nodeOperation(NodeRequest request, Task task) {
159+
return new NodeResponse(clusterService.localNode(), IndicesMetrics.getStatsWithoutCache(indicesService));
160+
}
161+
}
162+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.monitor.metrics;
11+
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.common.io.stream.Writeable;
15+
import org.elasticsearch.index.search.stats.SearchStats;
16+
import org.elasticsearch.index.shard.IndexingStats;
17+
18+
import java.io.IOException;
19+
20+
public final class IndexStats implements Writeable {
21+
int numIndices = 0;
22+
long numDocs = 0;
23+
long numBytes = 0;
24+
SearchStats.Stats search = new SearchStats().getTotal();
25+
IndexingStats.Stats indexing = new IndexingStats().getTotal();
26+
27+
IndexStats() {
28+
29+
}
30+
31+
IndexStats(StreamInput in) throws IOException {
32+
this.numIndices = in.readVInt();
33+
this.numDocs = in.readVLong();
34+
this.numBytes = in.readVLong();
35+
this.search = SearchStats.Stats.readStats(in);
36+
this.indexing = new IndexingStats.Stats(in);
37+
}
38+
39+
@Override
40+
public void writeTo(StreamOutput out) throws IOException {
41+
out.writeVInt(numIndices);
42+
out.writeVLong(numDocs);
43+
out.writeVLong(numBytes);
44+
search.writeTo(out);
45+
indexing.writeTo(out);
46+
}
47+
48+
void add(IndexStats other) {
49+
this.numIndices += other.numIndices;
50+
this.numDocs += other.numDocs;
51+
this.numBytes += other.numBytes;
52+
this.search.add(other.search);
53+
this.indexing.add(other.indexing);
54+
}
55+
56+
public int numIndices() {
57+
return numIndices;
58+
}
59+
60+
public long numDocs() {
61+
return numDocs;
62+
}
63+
64+
public long numBytes() {
65+
return numBytes;
66+
}
67+
}

0 commit comments

Comments
 (0)