From d88806b69de5b8b231d35aba8133b0fdac9e712b Mon Sep 17 00:00:00 2001 From: Larisa Motova Date: Wed, 20 Aug 2025 09:26:32 -1000 Subject: [PATCH 1/2] downsampleIT --- .../xpack/downsample/DownsampleIT.java | 70 +++++++++++++++++-- 1 file changed, 63 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java index 6eb3efcdeb735..0e8b98f3253f9 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.xpack.esql.action.EsqlQueryAction; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.io.IOException; import java.time.Instant; @@ -128,6 +129,10 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception { "cpu": { "type": "double", "time_series_metric": "gauge" + }, + "request": { + "type": "double", + "time_series_metric": "counter" } } } @@ -144,6 +149,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception { .field("host", randomFrom("host1", "host2", "host3")) .field("cluster", randomFrom("cluster1", "cluster2", "cluster3")) .field("cpu", randomDouble()) + .field("request", randomDoubleBetween(0, 100, true)) .endObject(); } catch (IOException e) { throw new RuntimeException(e); @@ -155,6 +161,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception { assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null))); List backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2); String sourceIndex = backingIndices.get(0); + String secondIndex = backingIndices.get(1); String interval = "5m"; String targetIndex = "downsample-" + interval + "-" + sourceIndex; // Set the source index to read-only state @@ -211,6 +218,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception { .field("host", randomFrom("host1", "host2", "host3")) .field("cluster", randomFrom("cluster1", "cluster2", "cluster3")) .field("cpu", randomDouble()) + .field("request", randomDoubleBetween(0, 100, true)) .endObject(); } catch (IOException e) { throw new RuntimeException(e); @@ -226,9 +234,9 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception { // Since the downsampled field (cpu) is downsampled in one index and not in the other, we want to confirm // first that the field is unsupported and has 2 original types - double and aggregate_metric_double - try (var resp = esqlCommand("TS " + dataStreamName + " | KEEP @timestamp, host, cluster, cpu")) { + try (var resp = esqlCommand("TS " + dataStreamName + " | KEEP @timestamp, host, cluster, cpu, request")) { var columns = resp.columns(); - assertThat(columns, hasSize(4)); + assertThat(columns, hasSize(5)); assertThat( resp.columns(), equalTo( @@ -236,17 +244,18 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception { new ColumnInfoImpl("@timestamp", "date", null), new ColumnInfoImpl("host", "keyword", null), new ColumnInfoImpl("cluster", "keyword", null), - new ColumnInfoImpl("cpu", "unsupported", List.of("aggregate_metric_double", "double")) + new ColumnInfoImpl("cpu", "unsupported", List.of("aggregate_metric_double", "double")), + new ColumnInfoImpl("request", "counter_double", null) ) ) ); } // test _over_time commands with implicit casting of aggregate_metric_double - for (String innerCommand : List.of("min_over_time", "max_over_time", "avg_over_time", "count_over_time")) { - for (String outerCommand : List.of("min", "max", "sum", "count")) { + for (String outerCommand : List.of("min", "max", "sum", "count")) { + String expectedType = outerCommand.equals("count") ? "long" : "double"; + for (String innerCommand : List.of("min_over_time", "max_over_time", "avg_over_time", "count_over_time")) { String command = outerCommand + " (" + innerCommand + "(cpu))"; - String expectedType = innerCommand.equals("count_over_time") || outerCommand.equals("count") ? "long" : "double"; try (var resp = esqlCommand("TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)")) { var columns = resp.columns(); assertThat(columns, hasSize(3)); @@ -254,7 +263,7 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception { resp.columns(), equalTo( List.of( - new ColumnInfoImpl(command, expectedType, null), + new ColumnInfoImpl(command, innerCommand.equals("count_over_time") ? "long" : expectedType, null), new ColumnInfoImpl("cluster", "keyword", null), new ColumnInfoImpl("bucket(@timestamp, 1 hour)", "date", null) ) @@ -263,6 +272,53 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception { // TODO: verify the numbers are accurate } } + // tests on non-downsampled index + // TODO: combine with above when support for aggregate_metric_double + implicit casting is added + // TODO: add to counter tests below when support for counters is added + for (String innerCommand : List.of("first_over_time", "last_over_time")) { + String command = outerCommand + " (" + innerCommand + "(cpu))"; + try (var resp = esqlCommand("TS " + secondIndex + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)")) { + var columns = resp.columns(); + assertThat(columns, hasSize(3)); + assertThat( + "resp is " + resp, + columns, + equalTo( + List.of( + new ColumnInfoImpl(command, expectedType, null), + new ColumnInfoImpl("cluster", "keyword", null), + new ColumnInfoImpl("bucket(@timestamp, 1 hour)", "date", null) + ) + ) + ); + } + } + + // tests on counter types + // TODO: remove hard-coded pragmas + var ratePragmas = new QueryPragmas(Settings.builder().put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build()); + for (String innerCommand : List.of("rate")) { + String command = outerCommand + " (" + innerCommand + "(request))"; + String esqlQuery = "TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)"; + try ( + var resp = client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(esqlQuery).pragmas(ratePragmas)) + .actionGet(30, TimeUnit.SECONDS) + ) { + var columns = resp.columns(); + assertThat(columns, hasSize(3)); + assertThat( + "resp is " + resp, + columns, + equalTo( + List.of( + new ColumnInfoImpl(command, expectedType, null), + new ColumnInfoImpl("cluster", "keyword", null), + new ColumnInfoImpl("bucket(@timestamp, 1 hour)", "date", null) + ) + ) + ); + } + } } } From c9ae9be1ee820fbdf01285d07fc6a4328d5966e7 Mon Sep 17 00:00:00 2001 From: Larisa Motova Date: Mon, 25 Aug 2025 19:34:27 -1000 Subject: [PATCH 2/2] assume snapshot for rate portion --- .../java/org/elasticsearch/xpack/downsample/DownsampleIT.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java index 0e8b98f3253f9..396011f2e834c 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.downsample; +import org.elasticsearch.Build; import org.elasticsearch.action.admin.cluster.node.capabilities.NodesCapabilitiesRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; @@ -296,7 +297,9 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception { // tests on counter types // TODO: remove hard-coded pragmas + assumeTrue("query pragmas require snapshot build", Build.current().isSnapshot()); var ratePragmas = new QueryPragmas(Settings.builder().put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build()); + for (String innerCommand : List.of("rate")) { String command = outerCommand + " (" + innerCommand + "(request))"; String esqlQuery = "TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)";