Skip to content

Commit 3e6100f

Browse files
committed
Index into TSDB
1 parent f77032e commit 3e6100f

File tree

9 files changed

+214
-18
lines changed

9 files changed

+214
-18
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ private static String maybeRewriteSingleAuthenticationHeaderForVersion(
195195
public static final String INFERENCE_ORIGIN = "inference";
196196
public static final String APM_ORIGIN = "apm";
197197
public static final String OTEL_ORIGIN = "otel";
198+
public static final String METRICSDB_ORIGIN = "metricsdb";
198199
public static final String REINDEX_DATA_STREAM_ORIGIN = "reindex_data_stream";
199200
public static final String ESQL_ORIGIN = "esql";
200201

x-pack/plugin/metricsdb/build.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,19 @@ apply plugin: 'elasticsearch.internal-cluster-test'
1111
esplugin {
1212
name = 'metricsdb'
1313
description = 'A plugin for metrics related functionality'
14-
classname ='org.elasticsearch.xpack.metrics.MetricsPlugin'
14+
classname ='org.elasticsearch.xpack.metrics.MetricsDBPlugin'
1515
extendedPlugins = ['x-pack-core']
1616
}
1717
base {
18-
archivesName = 'x-pack-metrics'
18+
archivesName = 'x-pack-metricsdb'
1919
}
2020

2121
dependencies {
2222
compileOnly project(path: xpackModule('core'))
2323
testImplementation project(':modules:data-streams')
2424
testImplementation project(':x-pack:plugin:esql')
2525
testImplementation project(':x-pack:plugin:esql-core')
26+
testImplementation project(':x-pack:plugin:mapper-version')
2627

2728
def otelVersion = "1.48.0"
2829
implementation "io.opentelemetry:opentelemetry-api:$otelVersion"

x-pack/plugin/metricsdb/src/internalClusterTest/java/org/elasticsearch/xpack/metrics/MetricsDBIndexingIT.java

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.opentelemetry.api.common.Attributes;
1212
import io.opentelemetry.api.metrics.Meter;
1313
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
14+
import io.opentelemetry.sdk.common.Clock;
1415
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
1516
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
1617
import io.opentelemetry.sdk.metrics.data.MetricData;
@@ -22,6 +23,7 @@
2223

2324
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
2425
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
26+
import org.elasticsearch.action.admin.indices.template.get.GetComposableIndexTemplateAction;
2527
import org.elasticsearch.common.settings.Settings;
2628
import org.elasticsearch.common.transport.TransportAddress;
2729
import org.elasticsearch.core.TimeValue;
@@ -33,6 +35,7 @@
3335
import org.elasticsearch.xpack.esql.action.EsqlQueryRequestBuilder;
3436
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
3537
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
38+
import org.elasticsearch.xpack.versionfield.VersionFieldPlugin;
3639
import org.junit.Test;
3740

3841
import java.net.InetSocketAddress;
@@ -42,6 +45,8 @@
4245
import java.util.concurrent.Executors;
4346
import java.util.concurrent.TimeUnit;
4447

48+
import static org.hamcrest.Matchers.anEmptyMap;
49+
import static org.hamcrest.Matchers.empty;
4550
import static org.hamcrest.Matchers.emptyArray;
4651
import static org.hamcrest.Matchers.greaterThan;
4752
import static org.hamcrest.Matchers.is;
@@ -54,7 +59,13 @@ public class MetricsDBIndexingIT extends ESSingleNodeTestCase {
5459

5560
@Override
5661
protected Collection<Class<? extends Plugin>> getPlugins() {
57-
return List.of(DataStreamsPlugin.class, InternalSettingsPlugin.class, MetricsDBPlugin.class, EsqlPlugin.class);
62+
return List.of(
63+
DataStreamsPlugin.class,
64+
InternalSettingsPlugin.class,
65+
MetricsDBPlugin.class,
66+
EsqlPlugin.class,
67+
VersionFieldPlugin.class
68+
);
5869
}
5970

6071
@Override
@@ -83,6 +94,14 @@ public void setUp() throws Exception {
8394
.build()
8495
)
8596
.build();
97+
assertBusy(() -> {
98+
GetComposableIndexTemplateAction.Request getReq = new GetComposableIndexTemplateAction.Request(
99+
TEST_REQUEST_TIMEOUT,
100+
"metricsdb@template"
101+
);
102+
var templates = client().execute(GetComposableIndexTemplateAction.INSTANCE, getReq).actionGet().indexTemplates();
103+
assertThat(templates, not(anEmptyMap()));
104+
});
86105
}
87106

88107
private int getHttpPort() {
@@ -111,16 +130,16 @@ public void testIngestMetricViaMeterProvider() throws Exception {
111130
.setUnit("By")
112131
.buildWithCallback(result -> result.record(Runtime.getRuntime().totalMemory(), Attributes.empty()));
113132

114-
var result = meterProvider.forceFlush().join(1, TimeUnit.SECONDS);
133+
var result = meterProvider.forceFlush().join(10, TimeUnit.SECONDS);
115134
assertThat(result.isSuccess(), is(true));
116135

117136
admin().indices().prepareRefresh().execute().actionGet();
118-
String[] indices = admin().indices().prepareGetIndex(TimeValue.timeValueSeconds(1)).setIndices("metrics*").get().indices();
137+
String[] indices = admin().indices().prepareGetIndex(TimeValue.timeValueSeconds(1)).setIndices("metricsdb").get().indices();
119138
assertThat(indices, not(emptyArray()));
120139

121140
try (EsqlQueryResponse resp = query("""
122-
FROM metrics*
123-
| STATS avg(value_double) WHERE metric_name == "jvm.memory.total"
141+
FROM metricsdb
142+
| STATS avg(metric.value.double) WHERE metric.name == "jvm.memory.total"
124143
""")) {
125144
double avgJvmMemoryTotal = (double) resp.column(0).next();
126145
assertThat(avgJvmMemoryTotal, greaterThan(0.0));
@@ -138,25 +157,25 @@ public void testIngestMetricDataViaMetricExporter() throws Exception {
138157
ImmutableGaugeData.create(
139158
List.of(
140159
ImmutableDoublePointData.create(
141-
System.currentTimeMillis(),
142-
System.currentTimeMillis(),
160+
Clock.getDefault().now(),
161+
Clock.getDefault().now(),
143162
Attributes.empty(),
144163
Runtime.getRuntime().totalMemory()
145164
)
146165
)
147166
)
148167
);
149168

150-
var result = exporter.export(List.of(jvmMemoryMetricData)).join(1, TimeUnit.SECONDS);
169+
var result = exporter.export(List.of(jvmMemoryMetricData)).join(10, TimeUnit.SECONDS);
151170
assertThat(result.isSuccess(), is(true));
152171

153172
admin().indices().prepareRefresh().execute().actionGet();
154-
String[] indices = admin().indices().prepareGetIndex(TimeValue.timeValueSeconds(1)).setIndices("metrics*").get().indices();
173+
String[] indices = admin().indices().prepareGetIndex(TimeValue.timeValueSeconds(1)).setIndices("metricsdb").get().indices();
155174
assertThat(indices, not(emptyArray()));
156175

157176
try (EsqlQueryResponse resp = query("""
158-
FROM metrics*
159-
| STATS avg(value_double) WHERE metric_name == "jvm.memory.total"
177+
FROM metricsdb
178+
| STATS avg(metric.value.double) WHERE metric.name == "jvm.memory.total"
160179
""")) {
161180
double avgJvmMemoryTotal = (double) resp.column(0).next();
162181
assertThat(avgJvmMemoryTotal, greaterThan(0.0));
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.metrics;
9+
10+
import org.elasticsearch.client.internal.Client;
11+
import org.elasticsearch.cluster.project.ProjectResolver;
12+
import org.elasticsearch.cluster.service.ClusterService;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.threadpool.ThreadPool;
15+
import org.elasticsearch.xcontent.NamedXContentRegistry;
16+
import org.elasticsearch.xpack.core.ClientHelper;
17+
import org.elasticsearch.xpack.core.template.YamlTemplateRegistry;
18+
19+
public class MetricsDBIndexTemplateRegistry extends YamlTemplateRegistry {
20+
21+
public static final String OTEL_TEMPLATE_VERSION_VARIABLE = "xpack.metricsdb.template.version";
22+
23+
public MetricsDBIndexTemplateRegistry(
24+
Settings nodeSettings,
25+
ClusterService clusterService,
26+
ThreadPool threadPool,
27+
Client client,
28+
NamedXContentRegistry xContentRegistry,
29+
ProjectResolver projectResolver
30+
) {
31+
super(nodeSettings, clusterService, threadPool, client, xContentRegistry, projectResolver);
32+
}
33+
34+
@Override
35+
protected String getOrigin() {
36+
return ClientHelper.METRICSDB_ORIGIN;
37+
}
38+
39+
@Override
40+
public String getName() {
41+
return "MetricsDB";
42+
}
43+
44+
@Override
45+
protected String getVersionProperty() {
46+
return OTEL_TEMPLATE_VERSION_VARIABLE;
47+
}
48+
}

x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBPlugin.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@
77

88
package org.elasticsearch.xpack.metrics;
99

10+
import org.apache.lucene.util.SetOnce;
1011
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1112
import org.elasticsearch.cluster.node.DiscoveryNodes;
13+
import org.elasticsearch.cluster.service.ClusterService;
1214
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1315
import org.elasticsearch.common.settings.ClusterSettings;
1416
import org.elasticsearch.common.settings.IndexScopedSettings;
@@ -26,6 +28,9 @@
2628
import java.util.function.Supplier;
2729

2830
public class MetricsDBPlugin extends Plugin implements ActionPlugin {
31+
32+
final SetOnce<MetricsDBIndexTemplateRegistry> registry = new SetOnce<>();
33+
2934
@Override
3035
public Collection<RestHandler> getRestHandlers(
3136
Settings settings,
@@ -41,6 +46,26 @@ public Collection<RestHandler> getRestHandlers(
4146
return List.of(new MetricsDBRestAction());
4247
}
4348

49+
@Override
50+
public Collection<?> createComponents(PluginServices services) {
51+
Settings settings = services.environment().settings();
52+
ClusterService clusterService = services.clusterService();
53+
registry.set(
54+
new MetricsDBIndexTemplateRegistry(
55+
settings,
56+
clusterService,
57+
services.threadPool(),
58+
services.client(),
59+
services.xContentRegistry(),
60+
services.projectResolver()
61+
)
62+
);
63+
MetricsDBIndexTemplateRegistry registryInstance = registry.get();
64+
registryInstance.setEnabled(true);
65+
registryInstance.initialize();
66+
return List.of();
67+
}
68+
4469
@Override
4570
public Collection<ActionHandler> getActions() {
4671
return List.of(new ActionHandler(MetricsDBTransportAction.TYPE, MetricsDBTransportAction.class));

x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBTransportAction.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.common.io.stream.StreamInput;
4141
import org.elasticsearch.common.io.stream.StreamOutput;
4242
import org.elasticsearch.core.Nullable;
43+
import org.elasticsearch.ingest.IngestService;
4344
import org.elasticsearch.injection.guice.Inject;
4445
import org.elasticsearch.tasks.Task;
4546
import org.elasticsearch.threadpool.ThreadPool;
@@ -80,9 +81,10 @@ protected void doExecute(Task task, MetricsRequest request, ActionListener<Metri
8081
public void onResponse(BulkResponse bulkItemResponses) {
8182
MessageLite response;
8283
if (bulkItemResponses.hasFailures()) {
84+
long failures = Arrays.stream(bulkItemResponses.getItems()).filter(BulkItemResponse::isFailed).count();
8385
response = ExportMetricsServiceResponse.newBuilder()
8486
.getPartialSuccessBuilder()
85-
.setRejectedDataPoints(Arrays.stream(bulkItemResponses.getItems()).filter(BulkItemResponse::isFailed).count())
87+
.setRejectedDataPoints(failures)
8688
.setErrorMessage(bulkItemResponses.buildFailureMessage())
8789
.build();
8890
} else {
@@ -168,7 +170,13 @@ private void createNumberDataPointDocs(
168170
metric,
169171
dp
170172
);
171-
bulkRequestBuilder.add(client.prepareIndex("metricsdb").setCreate(true).setSource(xContentBuilder));
173+
bulkRequestBuilder.add(
174+
client.prepareIndex("metricsdb")
175+
.setCreate(true)
176+
.setRequireDataStream(true)
177+
.setPipeline(IngestService.NOOP_PIPELINE_NAME)
178+
.setSource(xContentBuilder)
179+
);
172180
}
173181
}
174182
}
@@ -197,7 +205,6 @@ private void buildDataPointDoc(
197205
builder.startObject("attributes");
198206
buildAttributes(builder, resourceAttributes);
199207
builder.endObject();
200-
builder.field("metric_name", metric.getName());
201208
builder.field("unit", metric.getUnit());
202209
builder.field("type");
203210
if (metric.getDataCase() == Metric.DataCase.SUM) {
@@ -217,11 +224,16 @@ private void buildDataPointDoc(
217224
metric.getExponentialHistogram().getAggregationTemporality().toString()
218225
);
219226
}
227+
builder.startObject("metric");
228+
builder.field("name", metric.getName());
229+
builder.startObject("value");
220230
switch (dp.getValueCase()) {
221-
case AS_DOUBLE -> builder.field("value_double", dp.getAsDouble());
222-
case AS_INT -> builder.field("value_long", dp.getAsInt());
231+
case AS_DOUBLE -> builder.field("double", dp.getAsDouble());
232+
case AS_INT -> builder.field("long", dp.getAsInt());
223233
}
224234
builder.endObject();
235+
builder.endObject();
236+
builder.endObject();
225237
}
226238

227239
private void buildAttributes(XContentBuilder builder, List<KeyValue> resourceAttributes) throws IOException {
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
---
2+
version: ${xpack.metricsdb.template.version}
3+
index_patterns: ["metricsdb"]
4+
priority: 120
5+
data_stream: {}
6+
allow_auto_create: true
7+
_meta:
8+
description: default OpenTelemetry metrics template installed by x-pack
9+
managed: true
10+
template:
11+
settings:
12+
index.mode: time_series
13+
index.mapping.ignore_malformed: true
14+
mappings:
15+
date_detection: false
16+
dynamic: false
17+
properties:
18+
"@timestamp":
19+
type: date
20+
attributes:
21+
type: passthrough
22+
dynamic: true
23+
priority: 20
24+
time_series_dimension: true
25+
dropped_attributes_count:
26+
type: long
27+
scope:
28+
properties:
29+
name:
30+
type: keyword
31+
ignore_above: 1024
32+
time_series_dimension: true
33+
version:
34+
type: version
35+
schema_url:
36+
type: keyword
37+
ignore_above: 1024
38+
dropped_attributes_count:
39+
type: long
40+
attributes:
41+
type: passthrough
42+
dynamic: true
43+
priority: 30
44+
time_series_dimension: true
45+
resource:
46+
properties:
47+
schema_url:
48+
type: keyword
49+
ignore_above: 1024
50+
dropped_attributes_count:
51+
type: long
52+
attributes:
53+
type: passthrough
54+
dynamic: true
55+
priority: 40
56+
time_series_dimension: true
57+
start_timestamp:
58+
type: date
59+
metric:
60+
properties:
61+
name:
62+
type: keyword
63+
time_series_dimension: true
64+
value:
65+
properties:
66+
long:
67+
type: long
68+
double:
69+
type: double
70+
unit:
71+
type: keyword
72+
time_series_dimension: true
73+
ignore_above: 1024
74+
temporality:
75+
type: keyword
76+
time_series_dimension: true
77+
ignore_above: 1024
78+
type:
79+
type: keyword
80+
time_series_dimension: true
81+
ignore_above: 1024
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# "version" holds the version of the templates and ingest pipelines installed.
2+
# This must be increased whenever an existing template is
3+
# changed, in order for it to be updated on Elasticsearch upgrade.
4+
version: 1
5+
6+
index-templates:
7+
- metricsdb@template

0 commit comments

Comments
 (0)