Skip to content

Commit 898303e

Browse files
committed
Index via bulk requests, testing via OTel SDK and ES|QL
1 parent e147378 commit 898303e

File tree

4 files changed

+176
-398
lines changed

4 files changed

+176
-398
lines changed

x-pack/plugin/metricsdb/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ base {
2121
dependencies {
2222
compileOnly project(path: xpackModule('core'))
2323
testImplementation project(':modules:data-streams')
24-
24+
testImplementation project(':x-pack:plugin:esql')
25+
testImplementation project(':x-pack:plugin:esql-core')
2526

2627
def otelVersion = "1.48.0"
2728
implementation "io.opentelemetry:opentelemetry-api:$otelVersion"

x-pack/plugin/metricsdb/src/internalClusterTest/java/org/elasticsearch/xpack/metrics/MetricsDBIndexingIT.java

Lines changed: 29 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -15,35 +15,38 @@
1515

1616
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
1717
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
18-
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
19-
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
20-
import org.elasticsearch.cluster.metadata.Template;
21-
import org.elasticsearch.common.compress.CompressedXContent;
2218
import org.elasticsearch.common.settings.Settings;
2319
import org.elasticsearch.common.transport.TransportAddress;
20+
import org.elasticsearch.core.TimeValue;
2421
import org.elasticsearch.datastreams.DataStreamsPlugin;
2522
import org.elasticsearch.http.HttpInfo;
26-
import org.elasticsearch.index.query.QueryBuilders;
2723
import org.elasticsearch.plugins.Plugin;
2824
import org.elasticsearch.test.ESSingleNodeTestCase;
2925
import org.elasticsearch.test.InternalSettingsPlugin;
26+
import org.elasticsearch.xpack.esql.action.EsqlQueryRequestBuilder;
27+
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
28+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
3029
import org.junit.Test;
3130

3231
import java.net.InetSocketAddress;
32+
import java.time.Duration;
3333
import java.util.Collection;
3434
import java.util.List;
35+
import java.util.concurrent.Executors;
3536
import java.util.concurrent.TimeUnit;
3637

38+
import static org.hamcrest.Matchers.emptyArray;
3739
import static org.hamcrest.Matchers.greaterThan;
3840
import static org.hamcrest.Matchers.is;
41+
import static org.hamcrest.Matchers.not;
3942

4043
public class MetricsDBIndexingIT extends ESSingleNodeTestCase {
4144

4245
private SdkMeterProvider meterProvider;
4346

4447
@Override
4548
protected Collection<Class<? extends Plugin>> getPlugins() {
46-
return List.of(DataStreamsPlugin.class, InternalSettingsPlugin.class, MetricsDBPlugin.class);
49+
return List.of(DataStreamsPlugin.class, InternalSettingsPlugin.class, MetricsDBPlugin.class, EsqlPlugin.class);
4750
}
4851

4952
@Override
@@ -60,45 +63,20 @@ protected boolean addMockHttpTransport() {
6063
return false;
6164
}
6265

63-
6466
@Override
6567
public void setUp() throws Exception {
6668
super.setUp();
67-
var mappingTemplate = """
68-
{
69-
"_doc":{
70-
"properties": {
71-
"@timestamp" : {
72-
"type": "date"
73-
},
74-
"dummy": {
75-
"type": "keyword",
76-
"time_series_dimension": true
77-
}
78-
}
79-
}
80-
}""";
81-
82-
var request = new TransportPutComposableIndexTemplateAction.Request("id");
83-
request.indexTemplate(
84-
ComposableIndexTemplate.builder()
85-
.indexPatterns(List.of("metricsdb"))
86-
.template(
87-
new Template(Settings.builder().put("index.mode", "time_series").build(), new CompressedXContent(mappingTemplate), null)
88-
)
89-
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false))
90-
.priority(500L)
91-
.build()
92-
);
93-
var res = client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
94-
assertThat(res.isAcknowledged(), is(true));
95-
9669
OtlpHttpMetricExporter exporter = OtlpHttpMetricExporter.builder()
9770
.setEndpoint("http://localhost:" + getHttpPort() + "/_otlp/v1/metrics")
9871
.build();
9972

10073
meterProvider = SdkMeterProvider.builder()
101-
.registerMetricReader(PeriodicMetricReader.builder(exporter).build())
74+
.registerMetricReader(
75+
PeriodicMetricReader.builder(exporter)
76+
.setExecutor(Executors.newScheduledThreadPool(0))
77+
.setInterval(Duration.ofNanos(Long.MAX_VALUE))
78+
.build()
79+
)
10280
.build();
10381
}
10482

@@ -132,13 +110,19 @@ public void testIngestMetric() {
132110
assertThat(result.isSuccess(), is(true));
133111

134112
admin().indices().prepareRefresh().execute().actionGet();
135-
long hits = client().prepareSearch("metricsdb")
136-
.setQuery(QueryBuilders.matchAllQuery())
137-
.execute()
138-
.actionGet()
139-
.getHits()
140-
.getTotalHits()
141-
.value();
142-
assertThat(hits, greaterThan(0L));
113+
String[] indices = admin().indices().prepareGetIndex(TimeValue.timeValueSeconds(1)).setIndices("metrics*").get().indices();
114+
assertThat(indices, not(emptyArray()));
115+
116+
try (EsqlQueryResponse resp = query("""
117+
FROM metrics*
118+
| STATS avg(value_double) WHERE metric_name == "jvm.memory.total"
119+
""")) {
120+
double avgJvmMemoryTotal = (double) resp.column(0).next();
121+
assertThat(avgJvmMemoryTotal, greaterThan(0.0));
122+
}
123+
}
124+
125+
protected EsqlQueryResponse query(String esql) {
126+
return EsqlQueryRequestBuilder.newSyncEsqlQueryRequestBuilder(client()).query(esql).execute().actionGet();
143127
}
144128
}

x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBRestAction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
1111

1212
import com.google.protobuf.CodedOutputStream;
13+
import com.google.protobuf.MessageLite;
1314

1415
import org.elasticsearch.action.ActionListener;
1516
import org.elasticsearch.client.internal.node.NodeClient;
@@ -57,18 +58,17 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
5758
ActionListener.releaseBefore(request.content(), new RestResponseListener<>(channel) {
5859
@Override
5960
public RestResponse buildResponse(MetricsDBTransportAction.MetricsResponse r) throws Exception {
60-
return successResponse();
61+
return successResponse(r.getResponse());
6162
}
6263
})
6364
);
6465
}
6566

6667
// according to spec empty requests are successful
67-
return channel -> channel.sendResponse(successResponse());
68+
return channel -> channel.sendResponse(successResponse(ExportMetricsServiceResponse.newBuilder().build()));
6869
}
6970

70-
private RestResponse successResponse() throws IOException {
71-
var response = ExportMetricsServiceResponse.newBuilder().build();
71+
private RestResponse successResponse(MessageLite response) throws IOException {
7272
var responseBytes = ByteBuffer.allocate(response.getSerializedSize());
7373
response.writeTo(CodedOutputStream.newInstance(responseBytes));
7474

0 commit comments

Comments
 (0)