diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java index 4688a825ede8b..3fc5a4b77c090 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java @@ -41,6 +41,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -55,10 +56,22 @@ @SuppressWarnings("unchecked") @ESIntegTestCase.ClusterScope(maxNumDataNodes = 1) public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase { - private static final Long NUM_DOCS = 2000L; - private static final Long TIME_RANGE_SECONDS = 3600L; + private static final Long NUM_DOCS = 500L; + private static final Long TIME_RANGE_SECONDS = 900L; private static final String DATASTREAM_NAME = "tsit_ds"; private static final Integer SECONDS_IN_WINDOW = 60; + private static final List> WINDOW_OPTIONS = List.of( + Tuple.tuple("10 seconds", 10), + Tuple.tuple("30 seconds", 30), + Tuple.tuple("1 minute", 60), + Tuple.tuple("2 minutes", 120), + Tuple.tuple("3 minutes", 180), + Tuple.tuple("5 minutes", 300), + Tuple.tuple("10 minutes", 600), + Tuple.tuple("30 minutes", 1800), + Tuple.tuple("1 hour", 3600) + ); + private List documents; private TSDataGenerationHelper dataGenerationHelper; @@ -124,7 +137,7 @@ static List valuesInWindow(List> pointsInGroup, Str return values; } - static Map>>> groupByTimeseries( + static Map>>> groupByTimeseries( List> pointsInGroup, String metricName ) { @@ -136,19 +149,30 @@ static Map>>> groupByTimeseri .map(entry -> entry.getKey() + ":" + entry.getValue()) .collect(Collectors.joining(",")); var docTs = Instant.parse((String) doc.get("@timestamp")); - var docValue = (Integer) ((Map) doc.get("metrics")).get(metricName); + var docValue = switch (((Map) doc.get("metrics")).get(metricName)) { + case Integer i -> i.doubleValue(); + case Long l -> l.doubleValue(); + case Float f -> f.doubleValue(); + case Double d -> d; + default -> throw new IllegalStateException( + "Unexpected value type: " + + ((Map) doc.get("metrics")).get(metricName) + + " of class " + + ((Map) doc.get("metrics")).get(metricName).getClass() + ); + }; return new Tuple<>(docKey, new Tuple<>(docTs, docValue)); }) .collect(Collectors.groupingBy(Tuple::v1)); } static Object aggregatePerTimeseries( - Map>>> timeseries, + Map>>> timeseries, Agg crossAgg, Agg timeseriesAgg ) { var res = timeseries.values().stream().map(timeseriesList -> { - List values = timeseriesList.stream().map(t -> t.v2().v2()).collect(Collectors.toList()); + List values = timeseriesList.stream().map(t -> t.v2().v2()).collect(Collectors.toList()); return aggregateValuesInWindow(values, timeseriesAgg); }).filter(Objects::nonNull).toList(); @@ -157,27 +181,20 @@ static Object aggregatePerTimeseries( } return switch (crossAgg) { - case MAX -> res.isEmpty() - ? null - : Double.valueOf(res.stream().mapToDouble(Double::doubleValue).max().orElseThrow()).longValue(); - case MIN -> res.isEmpty() - ? null - : Double.valueOf(res.stream().mapToDouble(Double::doubleValue).min().orElseThrow()).longValue(); + case MAX -> res.isEmpty() ? null : res.stream().mapToDouble(Double::doubleValue).max().orElseThrow(); + case MIN -> res.isEmpty() ? null : res.stream().mapToDouble(Double::doubleValue).min().orElseThrow(); case AVG -> res.isEmpty() ? null : res.stream().mapToDouble(Double::doubleValue).average().orElseThrow(); - case SUM -> res.isEmpty() ? null : Double.valueOf(res.stream().mapToDouble(Double::doubleValue).sum()).longValue(); + case SUM -> res.isEmpty() ? null : res.stream().mapToDouble(Double::doubleValue).sum(); case COUNT -> Integer.toUnsignedLong(res.size()); }; } - static Double aggregateValuesInWindow(List values, Agg agg) { - // if (values.isEmpty()) { - // throw new IllegalArgumentException("No values to aggregate for " + agg + " operation"); - // } + static Double aggregateValuesInWindow(List values, Agg agg) { return switch (agg) { - case MAX -> Double.valueOf(values.stream().max(Integer::compareTo).orElseThrow()); - case MIN -> Double.valueOf(values.stream().min(Integer::compareTo).orElseThrow()); - case AVG -> values.stream().mapToDouble(Integer::doubleValue).average().orElseThrow(); - case SUM -> values.isEmpty() ? null : values.stream().mapToDouble(Integer::doubleValue).sum(); + case MAX -> values.stream().max(Double::compareTo).orElseThrow(); + case MIN -> values.stream().min(Double::compareTo).orElseThrow(); + case AVG -> values.stream().mapToDouble(Double::doubleValue).average().orElseThrow(); + case SUM -> values.isEmpty() ? null : values.stream().mapToDouble(Double::doubleValue).sum(); case COUNT -> (double) values.size(); }; } @@ -230,7 +247,7 @@ public int compareToFindingMax(RateRange o) { record RateStats(Long count, RateRange max, RateRange avg, RateRange min, RateRange sum) {} static RateStats calculateRateAggregation( - Collection>>> allTimeseries, + Collection>>> allTimeseries, Integer secondsInWindow ) { List allRates = allTimeseries.stream().map(timeseries -> { @@ -241,9 +258,9 @@ static RateStats calculateRateAggregation( timeseries.sort((t1, t2) -> t1.v2().v1().compareTo(t2.v2().v1())); var firstTs = timeseries.getFirst().v2().v1(); var lastTs = timeseries.getLast().v2().v1(); - Integer lastValue = null; + Double lastValue = null; Double counterGrowth = 0.0; - for (Tuple> point : timeseries) { + for (Tuple> point : timeseries) { var currentValue = point.v2().v2(); if (currentValue == null) { throw new IllegalArgumentException("Null value in counter timeseries"); @@ -266,7 +283,7 @@ static RateStats calculateRateAggregation( ); }).filter(Objects::nonNull).toList(); if (allRates.isEmpty()) { - return new RateStats(0L, null, null, null, new RateRange(0.0, 0.0)); + return new RateStats(0L, null, null, null, null); } return new RateStats( (long) allRates.size(), @@ -348,31 +365,38 @@ void assertNoFailedWindows(List failedWindows, List> rows) * the same values from the documents in the group. */ public void testRateGroupBySubset() { - var dimensions = ESTestCase.randomNonEmptySubsetOf(dataGenerationHelper.attributesForMetrics); - var dimensionsStr = dimensions.stream().map(d -> "attributes." + d).collect(Collectors.joining(", ")); + var window = ESTestCase.randomFrom(WINDOW_OPTIONS); + var windowSize = window.v2(); + var windowStr = window.v1(); + var dimensions = ESTestCase.randomSubsetOf(dataGenerationHelper.attributesForMetrics); + var dimensionsStr = dimensions.isEmpty() + ? "" + : ", " + dimensions.stream().map(d -> "attributes." + d).collect(Collectors.joining(", ")); try (var resp = run(String.format(Locale.ROOT, """ TS %s - | STATS count(rate(metrics.counter_hdd.bytes.read)), - max(rate(metrics.counter_hdd.bytes.read)), - avg(rate(metrics.counter_hdd.bytes.read)), - min(rate(metrics.counter_hdd.bytes.read)) - BY tbucket=bucket(@timestamp, 1 minute), %s + | STATS count(rate(metrics.counterl_hdd.bytes.read)), + max(rate(metrics.counterl_hdd.bytes.read)), + avg(rate(metrics.counterl_hdd.bytes.read)), + min(rate(metrics.counterl_hdd.bytes.read)), + sum(rate(metrics.counterl_hdd.bytes.read)) + BY tbucket=bucket(@timestamp, %s) %s | SORT tbucket | LIMIT 1000 - """, DATASTREAM_NAME, dimensionsStr))) { + """, DATASTREAM_NAME, windowStr, dimensionsStr))) { List> rows = consumeRows(resp); List failedWindows = new ArrayList<>(); - var groups = groupedRows(documents, dimensions, SECONDS_IN_WINDOW); + var groups = groupedRows(documents, dimensions, windowSize); for (List row : rows) { - var rowKey = getRowKey(row, dimensions, 4); + var rowKey = getRowKey(row, dimensions, 5); var windowDataPoints = groups.get(rowKey); - var docsPerTimeseries = groupByTimeseries(windowDataPoints, "counter_hdd.bytes.read"); - var rateAgg = calculateRateAggregation(docsPerTimeseries.values(), SECONDS_IN_WINDOW); + var docsPerTimeseries = groupByTimeseries(windowDataPoints, "counterl_hdd.bytes.read"); + var rateAgg = calculateRateAggregation(docsPerTimeseries.values(), windowSize); try { assertThat(row.getFirst(), equalTo(rateAgg.count)); checkWithin((Double) row.get(1), rateAgg.max); checkWithin((Double) row.get(2), rateAgg.avg); checkWithin((Double) row.get(3), rateAgg.min); + checkWithin((Double) row.get(4), rateAgg.sum); } catch (AssertionError e) { failedWindows.add("Failed for row:\n" + row + "\nWanted: " + rateAgg + "\nException: " + e.getMessage()); } @@ -391,10 +415,10 @@ public void testRateGroupByNothing() { var groups = groupedRows(documents, List.of(), 60); try (var resp = run(String.format(Locale.ROOT, """ TS %s - | STATS count(rate(metrics.counter_hdd.bytes.read)), - max(rate(metrics.counter_hdd.bytes.read)), - avg(rate(metrics.counter_hdd.bytes.read)), - min(rate(metrics.counter_hdd.bytes.read)) + | STATS count(rate(metrics.counterl_hdd.bytes.read)), + max(rate(metrics.counterl_hdd.bytes.read)), + avg(rate(metrics.counterl_hdd.bytes.read)), + min(rate(metrics.counterl_hdd.bytes.read)) BY tbucket=bucket(@timestamp, 1 minute) | SORT tbucket | LIMIT 1000 @@ -404,7 +428,7 @@ public void testRateGroupByNothing() { for (List row : rows) { var windowStart = windowStart(row.get(4), SECONDS_IN_WINDOW); var windowDataPoints = groups.get(List.of(Long.toString(windowStart))); - var docsPerTimeseries = groupByTimeseries(windowDataPoints, "counter_hdd.bytes.read"); + var docsPerTimeseries = groupByTimeseries(windowDataPoints, "counterl_hdd.bytes.read"); var rateAgg = calculateRateAggregation(docsPerTimeseries.values(), SECONDS_IN_WINDOW); try { assertThat(row.getFirst(), equalTo(rateAgg.count)); @@ -419,6 +443,77 @@ public void testRateGroupByNothing() { } } + public void testGaugeGroupByRandomAndRandomAgg() { + var randomWindow = ESTestCase.randomFrom(WINDOW_OPTIONS); + var windowSize = randomWindow.v2(); + var windowStr = randomWindow.v1(); + var dimensions = ESTestCase.randomSubsetOf(dataGenerationHelper.attributesForMetrics); + var dimensionsStr = dimensions.isEmpty() + ? "" + : ", " + dimensions.stream().map(d -> "attributes." + d).collect(Collectors.joining(", ")); + var metricName = ESTestCase.randomFrom(List.of("gaugel_hdd.bytes.used", "gauged_cpu.percent")); + var selectedAggs = ESTestCase.randomSubsetOf(2, Agg.values()); + var aggExpression = String.format( + Locale.ROOT, + "%s(%s_over_time(metrics.%s))", + selectedAggs.get(0), + selectedAggs.get(1), + metricName + ); + // TODO: Remove WHERE clause after fixing https://github.com/elastic/elasticsearch/issues/129524 + var query = String.format(Locale.ROOT, """ + TS %s + | WHERE %s IS NOT NULL + | STATS + %s + BY tbucket=bucket(@timestamp, %s) %s + | SORT tbucket + | LIMIT 1000""", DATASTREAM_NAME, metricName, aggExpression, windowStr, dimensionsStr); + try (EsqlQueryResponse resp = run(query)) { + var groups = groupedRows(documents, dimensions, windowSize); + List> rows = consumeRows(resp); + for (List row : rows) { + var rowKey = getRowKey(row, dimensions, 1); + var tsGroups = groupByTimeseries(groups.get(rowKey), metricName); + Object expectedVal = aggregatePerTimeseries(tsGroups, selectedAggs.get(0), selectedAggs.get(1)); + Double actualVal = switch (row.get(0)) { + case Long l -> l.doubleValue(); + case Double d -> d; + case null -> null; + default -> throw new IllegalStateException( + "Unexpected value type: " + row.get(0) + " of class " + row.get(0).getClass() + ); + }; + try { + switch (expectedVal) { + case Double dVal -> assertThat(actualVal, closeTo(dVal, dVal * 0.01)); + case Long lVal -> assertThat(actualVal, closeTo(lVal.doubleValue(), lVal * 0.01)); + case null -> assertThat(actualVal, equalTo(null)); + default -> throw new IllegalStateException( + "Unexpected value type: " + expectedVal + " of class " + expectedVal.getClass() + ); + } + } catch (AssertionError e) { + throw new AssertionError( + "Failed for aggregations:\n" + + selectedAggs + + " with total dimensions for grouping: " + + dimensions.size() + + " on metric " + + metricName + + "\nWanted val: " + + expectedVal + + "\nGot val: " + + actualVal + + "\nException: " + + e.getMessage(), + e + ); + } + } + } + } + /** * This test validates Gauge metrics aggregation with grouping by time bucket and a subset of dimensions. * The subset of dimensions is a random subset of the dimensions present in the data. @@ -431,13 +526,13 @@ public void testGroupBySubset() { try (EsqlQueryResponse resp = run(String.format(Locale.ROOT, """ TS %s | STATS - values(metrics.gauge_hdd.bytes.used), - max(max_over_time(metrics.gauge_hdd.bytes.used)), - min(min_over_time(metrics.gauge_hdd.bytes.used)), - sum(count_over_time(metrics.gauge_hdd.bytes.used)), - sum(sum_over_time(metrics.gauge_hdd.bytes.used)), - avg(avg_over_time(metrics.gauge_hdd.bytes.used)), - count(count_over_time(metrics.gauge_hdd.bytes.used)) + values(metrics.gaugel_hdd.bytes.used), + max(max_over_time(metrics.gaugel_hdd.bytes.used)), + min(min_over_time(metrics.gaugel_hdd.bytes.used)), + sum(count_over_time(metrics.gaugel_hdd.bytes.used)), + sum(sum_over_time(metrics.gaugel_hdd.bytes.used)), + avg(avg_over_time(metrics.gaugel_hdd.bytes.used)), + count(count_over_time(metrics.gaugel_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute), %s | SORT tbucket | LIMIT 1000""", DATASTREAM_NAME, dimensionsStr))) { @@ -445,8 +540,8 @@ public void testGroupBySubset() { List> rows = consumeRows(resp); for (List row : rows) { var rowKey = getRowKey(row, dimensions, 7); - var tsGroups = groupByTimeseries(groups.get(rowKey), "gauge_hdd.bytes.used"); - var docValues = valuesInWindow(groups.get(rowKey), "gauge_hdd.bytes.used"); + var tsGroups = groupByTimeseries(groups.get(rowKey), "gaugel_hdd.bytes.used"); + var docValues = valuesInWindow(groups.get(rowKey), "gaugel_hdd.bytes.used"); if (row.get(0) instanceof List) { assertThat( (Collection) row.getFirst(), @@ -476,13 +571,13 @@ public void testGroupByNothing() { try (EsqlQueryResponse resp = run(String.format(Locale.ROOT, """ TS %s | STATS - values(metrics.gauge_hdd.bytes.used), - max(max_over_time(metrics.gauge_hdd.bytes.used)), - min(min_over_time(metrics.gauge_hdd.bytes.used)), - sum(count_over_time(metrics.gauge_hdd.bytes.used)), - sum(sum_over_time(metrics.gauge_hdd.bytes.used)), - avg(avg_over_time(metrics.gauge_hdd.bytes.used)), - count(count_over_time(metrics.gauge_hdd.bytes.used)) + values(metrics.gaugel_hdd.bytes.used), + max(max_over_time(metrics.gaugel_hdd.bytes.used)), + min(min_over_time(metrics.gaugel_hdd.bytes.used)), + sum(count_over_time(metrics.gaugel_hdd.bytes.used)), + sum(sum_over_time(metrics.gaugel_hdd.bytes.used)), + avg(avg_over_time(metrics.gaugel_hdd.bytes.used)), + count(count_over_time(metrics.gaugel_hdd.bytes.used)) BY tbucket=bucket(@timestamp, 1 minute) | SORT tbucket | LIMIT 1000""", DATASTREAM_NAME))) { @@ -490,8 +585,8 @@ public void testGroupByNothing() { var groups = groupedRows(documents, List.of(), 60); for (List row : rows) { var windowStart = windowStart(row.get(7), 60); - List docValues = valuesInWindow(groups.get(List.of(Long.toString(windowStart))), "gauge_hdd.bytes.used"); - var tsGroups = groupByTimeseries(groups.get(List.of(Long.toString(windowStart))), "gauge_hdd.bytes.used"); + List docValues = valuesInWindow(groups.get(List.of(Long.toString(windowStart))), "gaugel_hdd.bytes.used"); + var tsGroups = groupByTimeseries(groups.get(List.of(Long.toString(windowStart))), "gaugel_hdd.bytes.used"); if (row.get(0) instanceof List) { assertThat( (Collection) row.get(0), @@ -500,10 +595,16 @@ public void testGroupByNothing() { } else { assertThat(row.getFirst(), equalTo(docValues.isEmpty() ? null : docValues.getFirst().longValue())); } - assertThat(row.get(1), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX))); - assertThat(row.get(2), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN))); - assertThat(row.get(3), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT))); - assertThat(row.get(4), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM))); + Function toDouble = cell -> switch (cell) { + case Long l -> l.doubleValue(); + case Double d -> d; + case null -> null; + default -> throw new IllegalStateException("Unexpected value type: " + cell + " of class " + cell.getClass()); + }; + assertThat(toDouble.apply(row.get(1)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX))); + assertThat(toDouble.apply(row.get(2)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN))); + assertThat(toDouble.apply(row.get(3)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT))); + assertThat(toDouble.apply(row.get(4)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM))); var avg = (Double) aggregatePerTimeseries(tsGroups, Agg.AVG, Agg.AVG); assertThat((Double) row.get(5), row.get(5) == null ? equalTo(null) : closeTo(avg, avg * 0.01)); // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue())); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java index 5b0914244fe7c..8273512b2f1fa 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TSDataGenerationHelper.java @@ -119,12 +119,12 @@ private static Object randomDimensionValue(String dimensionName) { (ignored) -> { var res = new HashMap(); - res.put("counter_hdd.bytes.read", Randomness.get().nextLong(0, 1000L)); + res.put("counterl_hdd.bytes.read", Randomness.get().nextLong(0, 1000L)); // Counter metrics switch (ESTestCase.randomIntBetween(0, 2)) { - case 0 -> res.put("counter_kwh.consumed", Randomness.get().nextDouble(0, 1000000)); - case 1 -> res.put("gauge_hdd.bytes.used", Randomness.get().nextLong(0, 1000000000L)); - case 2 -> res.put("gauge_cpu.percent", Randomness.get().nextDouble(0, 100)); + case 0 -> res.put("counterd_kwh.consumed", Randomness.get().nextDouble(0, 1000000)); + case 1 -> res.put("gaugel_hdd.bytes.used", Randomness.get().nextLong(0, 1000000000L)); + case 2 -> res.put("gauged_cpu.percent", Randomness.get().nextDouble(0, 100)); } return res; } @@ -146,19 +146,19 @@ private static Object randomDimensionValue(String dimensionName) { List.of( Map.of( "counter_long", - Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "long", "time_series_metric", "counter")) + Map.of("path_match", "metrics.counterl_*", "mapping", Map.of("type", "long", "time_series_metric", "counter")) ), Map.of( "counter_double", - Map.of("path_match", "metrics.counter_*", "mapping", Map.of("type", "double", "time_series_metric", "counter")) + Map.of("path_match", "metrics.counterd_*", "mapping", Map.of("type", "double", "time_series_metric", "counter")) ), Map.of( "gauge_long", - Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "long", "time_series_metric", "gauge")) + Map.of("path_match", "metrics.gaugel_*", "mapping", Map.of("type", "long", "time_series_metric", "gauge")) ), Map.of( "gauge_double", - Map.of("path_match", "metrics.gauge_*", "mapping", Map.of("type", "double", "time_series_metric", "gauge")) + Map.of("path_match", "metrics.gauged_*", "mapping", Map.of("type", "double", "time_series_metric", "gauge")) ) ) );