Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.downsample;

import org.elasticsearch.Build;
import org.elasticsearch.action.admin.cluster.node.capabilities.NodesCapabilitiesRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
Expand All @@ -27,6 +28,7 @@
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -128,6 +130,10 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
"cpu": {
"type": "double",
"time_series_metric": "gauge"
},
"request": {
"type": "double",
"time_series_metric": "counter"
}
}
}
Expand All @@ -144,6 +150,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
.field("host", randomFrom("host1", "host2", "host3"))
.field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
.field("cpu", randomDouble())
.field("request", randomDoubleBetween(0, 100, true))
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -155,6 +162,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)));
List<String> backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2);
String sourceIndex = backingIndices.get(0);
String secondIndex = backingIndices.get(1);
String interval = "5m";
String targetIndex = "downsample-" + interval + "-" + sourceIndex;
// Set the source index to read-only state
Expand Down Expand Up @@ -211,6 +219,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
.field("host", randomFrom("host1", "host2", "host3"))
.field("cluster", randomFrom("cluster1", "cluster2", "cluster3"))
.field("cpu", randomDouble())
.field("request", randomDoubleBetween(0, 100, true))
.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -226,35 +235,36 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {

// Since the downsampled field (cpu) is downsampled in one index and not in the other, we want to confirm
// first that the field is unsupported and has 2 original types - double and aggregate_metric_double
try (var resp = esqlCommand("TS " + dataStreamName + " | KEEP @timestamp, host, cluster, cpu")) {
try (var resp = esqlCommand("TS " + dataStreamName + " | KEEP @timestamp, host, cluster, cpu, request")) {
var columns = resp.columns();
assertThat(columns, hasSize(4));
assertThat(columns, hasSize(5));
assertThat(
resp.columns(),
equalTo(
List.of(
new ColumnInfoImpl("@timestamp", "date", null),
new ColumnInfoImpl("host", "keyword", null),
new ColumnInfoImpl("cluster", "keyword", null),
new ColumnInfoImpl("cpu", "unsupported", List.of("aggregate_metric_double", "double"))
new ColumnInfoImpl("cpu", "unsupported", List.of("aggregate_metric_double", "double")),
new ColumnInfoImpl("request", "counter_double", null)
)
)
);
}

// test _over_time commands with implicit casting of aggregate_metric_double
for (String innerCommand : List.of("min_over_time", "max_over_time", "avg_over_time", "count_over_time")) {
for (String outerCommand : List.of("min", "max", "sum", "count")) {
for (String outerCommand : List.of("min", "max", "sum", "count")) {
String expectedType = outerCommand.equals("count") ? "long" : "double";
for (String innerCommand : List.of("min_over_time", "max_over_time", "avg_over_time", "count_over_time")) {
String command = outerCommand + " (" + innerCommand + "(cpu))";
String expectedType = innerCommand.equals("count_over_time") || outerCommand.equals("count") ? "long" : "double";
try (var resp = esqlCommand("TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)")) {
var columns = resp.columns();
assertThat(columns, hasSize(3));
assertThat(
resp.columns(),
equalTo(
List.of(
new ColumnInfoImpl(command, expectedType, null),
new ColumnInfoImpl(command, innerCommand.equals("count_over_time") ? "long" : expectedType, null),
new ColumnInfoImpl("cluster", "keyword", null),
new ColumnInfoImpl("bucket(@timestamp, 1 hour)", "date", null)
)
Expand All @@ -263,6 +273,55 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
// TODO: verify the numbers are accurate
}
}
// tests on non-downsampled index
// TODO: combine with above when support for aggregate_metric_double + implicit casting is added
// TODO: add to counter tests below when support for counters is added
for (String innerCommand : List.of("first_over_time", "last_over_time")) {
String command = outerCommand + " (" + innerCommand + "(cpu))";
try (var resp = esqlCommand("TS " + secondIndex + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)")) {
var columns = resp.columns();
assertThat(columns, hasSize(3));
assertThat(
"resp is " + resp,
columns,
equalTo(
List.of(
new ColumnInfoImpl(command, expectedType, null),
new ColumnInfoImpl("cluster", "keyword", null),
new ColumnInfoImpl("bucket(@timestamp, 1 hour)", "date", null)
)
)
);
}
}

// tests on counter types
// TODO: remove hard-coded pragmas
assumeTrue("query pragmas require snapshot build", Build.current().isSnapshot());
var ratePragmas = new QueryPragmas(Settings.builder().put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build());

for (String innerCommand : List.of("rate")) {
String command = outerCommand + " (" + innerCommand + "(request))";
String esqlQuery = "TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)";
try (
var resp = client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(esqlQuery).pragmas(ratePragmas))
.actionGet(30, TimeUnit.SECONDS)
) {
var columns = resp.columns();
assertThat(columns, hasSize(3));
assertThat(
"resp is " + resp,
columns,
equalTo(
List.of(
new ColumnInfoImpl(command, expectedType, null),
new ColumnInfoImpl("cluster", "keyword", null),
new ColumnInfoImpl("bucket(@timestamp, 1 hour)", "date", null)
)
)
);
}
}
}
}

Expand Down