diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-last-over-time.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-last-over-time.csv-spec index 5c07265b466a4..024d8e8149f5e 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-last-over-time.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-last-over-time.csv-spec @@ -17,6 +17,24 @@ clients:double | cluster:keyword | time_bucket:datetime 357.0 | staging | 2024-05-10T00:03:00.000Z ; +implicit_last_over_time_of_integer +required_capability: metrics_command +required_capability: implicit_last_over_time +TS k8s | STATS clients = avg(network.eth0.currently_connected_clients) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT time_bucket, cluster | LIMIT 10; + +clients:double | cluster:keyword | time_bucket:datetime +429.0 | prod | 2024-05-10T00:00:00.000Z +615.5 | staging | 2024-05-10T00:00:00.000Z +396.5 | prod | 2024-05-10T00:01:00.000Z +440.0 | qa | 2024-05-10T00:01:00.000Z +632.5 | prod | 2024-05-10T00:02:00.000Z +565.0 | qa | 2024-05-10T00:02:00.000Z +205.0 | staging | 2024-05-10T00:02:00.000Z +742.0 | prod | 2024-05-10T00:03:00.000Z +454.0 | qa | 2024-05-10T00:03:00.000Z +357.0 | staging | 2024-05-10T00:03:00.000Z +; + last_over_time_of_long required_capability: metrics_command required_capability: last_over_time @@ -35,6 +53,24 @@ bytes:double | cluster:keyword | time_bucket:datetime 612.5 | staging | 2024-05-10T00:03:00.000Z ; +implicit_last_over_time_of_long +required_capability: metrics_command +required_capability: implicit_last_over_time +TS k8s | STATS bytes = avg(network.bytes_in) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT time_bucket, cluster | LIMIT 10; + +bytes:double | cluster:keyword | time_bucket:datetime +677.0 | prod | 2024-05-10T00:00:00.000Z +586.0 | staging | 2024-05-10T00:00:00.000Z +628.5 | prod | 2024-05-10T00:01:00.000Z +538.5 | qa | 2024-05-10T00:01:00.000Z +612.0 | prod | 2024-05-10T00:02:00.000Z +749.0 | qa | 2024-05-10T00:02:00.000Z +382.5 | staging | 2024-05-10T00:02:00.000Z +970.0 | prod | 2024-05-10T00:03:00.000Z +373.0 | qa | 2024-05-10T00:03:00.000Z +612.5 | staging | 2024-05-10T00:03:00.000Z +; + last_over_time_with_filtering required_capability: metrics_command required_capability: last_over_time @@ -52,6 +88,25 @@ tx:long | cluster:keyword | time_bucket:datetime 238 | staging | 2024-05-10T00:20:00.000Z ; + +implicit_last_over_time_with_filtering +required_capability: metrics_command +required_capability: implicit_last_over_time +TS k8s | WHERE pod == "one" | STATS tx = sum(network.bytes_in) BY cluster, time_bucket = bucket(@timestamp, 10minute) | SORT time_bucket, cluster | LIMIT 10; + +tx:long | cluster:keyword | time_bucket:datetime +3 | prod | 2024-05-10T00:00:00.000Z +830 | qa | 2024-05-10T00:00:00.000Z +753 | staging | 2024-05-10T00:00:00.000Z +542 | prod | 2024-05-10T00:10:00.000Z +187 | qa | 2024-05-10T00:10:00.000Z +4 | staging | 2024-05-10T00:10:00.000Z +931 | prod | 2024-05-10T00:20:00.000Z +206 | qa | 2024-05-10T00:20:00.000Z +238 | staging | 2024-05-10T00:20:00.000Z +; + + last_over_time_older_than_10d required_capability: metrics_command required_capability: last_over_time @@ -65,6 +120,19 @@ cost:double | pod:keyword | time_bucket:datetime 1038.0 | three | 2024-05-10T00:10:00.000Z ; +implicit_last_over_time_older_than_10d +required_capability: metrics_command +required_capability: implicit_last_over_time +TS k8s | WHERE cluster == "qa" AND @timestamp < now() - 10 day | STATS cost = avg(network.eth0.rx) BY pod, time_bucket = bucket(@timestamp, 10minute) | SORT time_bucket, pod | LIMIT 5; + +cost:double | pod:keyword | time_bucket:datetime +818.0 | one | 2024-05-10T00:00:00.000Z +529.0 | three | 2024-05-10T00:00:00.000Z +620.0 | two | 2024-05-10T00:00:00.000Z +1262.0 | one | 2024-05-10T00:10:00.000Z +1038.0 | three | 2024-05-10T00:10:00.000Z +; + eval_on_last_over_time required_capability: metrics_command required_capability: last_over_time @@ -82,6 +150,23 @@ max_bytes:double | cluster:keyword | time_bucket:datetime | kb_minus_offset 81.33333333333333 | staging | 2024-05-10T00:20:00.000Z | -0.01866666666666667 ; +implicit_eval_on_last_over_time +required_capability: metrics_command +required_capability: implicit_last_over_time +TS k8s | STATS max_bytes = avg(network.bytes_in) BY cluster, time_bucket = bucket(@timestamp, 10minute) | EVAL kb_minus_offset = (max_bytes - 100) / 1000.0 | LIMIT 10 | SORT time_bucket, cluster ; + +max_bytes:double | cluster:keyword | time_bucket:datetime | kb_minus_offset:double +225.0 | prod | 2024-05-10T00:00:00.000Z | 0.125 +485.6666666666667 | qa | 2024-05-10T00:00:00.000Z | 0.3856666666666667 +572.6666666666666 | staging | 2024-05-10T00:00:00.000Z | 0.4726666666666666 +517.6666666666666 | prod | 2024-05-10T00:10:00.000Z | 0.41766666666666663 +426.6666666666667 | qa | 2024-05-10T00:10:00.000Z | 0.32666666666666666 +482.3333333333333 | staging | 2024-05-10T00:10:00.000Z | 0.3823333333333333 +839.0 | prod | 2024-05-10T00:20:00.000Z | 0.739 +697.0 | qa | 2024-05-10T00:20:00.000Z | 0.597 +81.33333333333333 | staging | 2024-05-10T00:20:00.000Z | -0.01866666666666667 +; + last_over_time_multi_values required_capability: metrics_command required_capability: last_over_time @@ -101,6 +186,26 @@ events:long | pod:keyword | time_bucket:datetime 9 | three | 2024-05-10T00:02:00.000Z ; + +implicit_last_over_time_multi_values +required_capability: metrics_command +required_capability: implicit_last_over_time +TS k8s | WHERE @timestamp < "2024-05-10T00:10:00.000Z" | STATS events = sum(events_received) by pod, time_bucket = bucket(@timestamp, 1minute) | SORT events desc, pod, time_bucket | LIMIT 10; + +events:long | pod:keyword | time_bucket:datetime +18 | one | 2024-05-10T00:01:00.000Z +16 | one | 2024-05-10T00:08:00.000Z +12 | one | 2024-05-10T00:03:00.000Z +12 | three | 2024-05-10T00:00:00.000Z +12 | two | 2024-05-10T00:09:00.000Z +10 | three | 2024-05-10T00:06:00.000Z +10 | two | 2024-05-10T00:02:00.000Z +10 | two | 2024-05-10T00:04:00.000Z +9 | one | 2024-05-10T00:09:00.000Z +9 | three | 2024-05-10T00:02:00.000Z +; + + last_over_time_null_values required_capability: metrics_command required_capability: last_over_time @@ -120,6 +225,24 @@ null | two | 2024-05-10T00:13:00.000Z 7 | three | 2024-05-10T00:12:00.000Z ; +implicit_last_over_time_null_values +required_capability: metrics_command +required_capability: implicit_last_over_time +TS k8s | WHERE @timestamp > "2024-05-10T00:10:00.000Z" and @timestamp < "2024-05-10T00:15:00.000Z" | STATS events = sum(events_received) by pod, time_bucket = bucket(@timestamp, 1minute) | SORT events desc, pod, time_bucket | LIMIT 10; + +events:long | pod:keyword | time_bucket:datetime +null | one | 2024-05-10T00:12:00.000Z +null | two | 2024-05-10T00:13:00.000Z +20 | two | 2024-05-10T00:14:00.000Z +18 | two | 2024-05-10T00:12:00.000Z +16 | one | 2024-05-10T00:13:00.000Z +16 | one | 2024-05-10T00:14:00.000Z +11 | one | 2024-05-10T00:10:00.000Z +9 | one | 2024-05-10T00:11:00.000Z +9 | three | 2024-05-10T00:13:00.000Z +7 | three | 2024-05-10T00:12:00.000Z +; + last_over_time_all_value_types required_capability: metrics_command required_capability: last_over_time @@ -138,3 +261,21 @@ events:long | pod:keyword | time_bucket:datetime 5 | two | 2024-05-10T00:20:00.000Z ; + +implicit_last_over_time_all_value_types +required_capability: metrics_command +required_capability: implicit_last_over_time +TS k8s | STATS events = sum(events_received) by pod, time_bucket = bucket(@timestamp, 10minute) | SORT events desc, pod, time_bucket | LIMIT 10 ; + +events:long | pod:keyword | time_bucket:datetime +21 | three | 2024-05-10T00:10:00.000Z +20 | one | 2024-05-10T00:10:00.000Z +15 | one | 2024-05-10T00:20:00.000Z +15 | three | 2024-05-10T00:20:00.000Z +13 | two | 2024-05-10T00:10:00.000Z +12 | two | 2024-05-10T00:00:00.000Z +9 | one | 2024-05-10T00:00:00.000Z +9 | three | 2024-05-10T00:00:00.000Z +5 | two | 2024-05-10T00:20:00.000Z +; + diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index a903503e147e0..da92b4dbb99c3 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -49,10 +49,19 @@ max_cost: double maxRateAndBytes required_capability: metrics_command +required_capability: implicit_last_over_time TS k8s | STATS max(60 * rate(network.total_bytes_in)), max(network.bytes_in); max(60 * rate(network.total_bytes_in)): double | max(network.bytes_in): long -790.4235090751944 | 1021 +790.4235090751944 | 972 +; + +maxRateAndBytesExplicit +required_capability: metrics_command +TS k8s | STATS max(60 * rate(network.total_bytes_in)), max(last_over_time(network.bytes_in)); + +max(60 * rate(network.total_bytes_in)): double | max(last_over_time(network.bytes_in)): long +790.4235090751944 | 972 ; maxRateAndMarkupBytes @@ -60,15 +69,31 @@ required_capability: metrics_command TS k8s | STATS max(rate(network.total_bytes_in)), max(network.bytes_in * 1.05); max(rate(network.total_bytes_in)): double | max(network.bytes_in * 1.05): double - 13.17372515125324 | 1072.05 + 13.17372515125324 | 1020.6 +; + +maxRateAndMarkupBytesExplicit +required_capability: metrics_command +TS k8s | STATS max(rate(network.total_bytes_in)), max_bytes_in = max(last_over_time(network.bytes_in) * 1.05); + +max(rate(network.total_bytes_in)): double | max_bytes_in: double + 13.17372515125324 | 1020.6 +; + +maxRateAndLastBytesIn +required_capability: metrics_command +TS k8s | STATS max(rate(network.total_bytes_in)), max_bytes_in = max(last_over_time(network.bytes_in * 1.05)); + +max(rate(network.total_bytes_in)): double | max_bytes_in: double + 13.17372515125324 | 1020.6 ; maxRateAndBytesAndCost required_capability: metrics_command -TS k8s | STATS max(rate(network.total_bytes_in)), max(network.bytes_in), max(rate(network.total_cost)); +TS k8s | STATS max(rate(network.total_bytes_in)), max(max_over_time(network.bytes_in)), max(rate(network.total_cost)); -max(rate(network.total_bytes_in)): double| max(network.bytes_in): long| max(rate(network.total_cost)): double -13.17372515125324 | 1021 | 0.16151685393258428 +max(rate(network.total_bytes_in)): double| max(max_over_time(network.bytes_in)): long| max(rate(network.total_cost)): double +13.17372515125324 | 1021 | 0.16151685393258428 ; sumRate @@ -144,15 +169,15 @@ max(rate(network.total_bytes_in)):double | time_bucket:datetime | cluster:ke BytesAndCostByBucketAndCluster required_capability: metrics_command -TS k8s | STATS max(rate(network.total_bytes_in)), max(network.cost) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC, cluster | LIMIT 6; - -max(rate(network.total_bytes_in)): double | max(network.cost): double | time_bucket:date | cluster: keyword -6.980660660660663 | 10.75 | 2024-05-10T00:20:00.000Z | prod -4.05 | 11.875 | 2024-05-10T00:20:00.000Z | qa -3.19 | 9.5 | 2024-05-10T00:20:00.000Z | staging -11.860805860805861 | 12.375 | 2024-05-10T00:15:00.000Z | prod -23.702205882352942 | 12.125 | 2024-05-10T00:15:00.000Z | qa -7.784911616161616 | 11.5 | 2024-05-10T00:15:00.000Z | staging +TS k8s | STATS max(rate(network.total_bytes_in)), max(max_over_time(network.cost)) BY time_bucket = bucket(@timestamp,5minute), cluster | SORT time_bucket DESC, cluster | LIMIT 6; + +max(rate(network.total_bytes_in)): double | max(max_over_time(network.cost)): double | time_bucket:date | cluster: keyword +6.980660660660663 | 10.75 | 2024-05-10T00:20:00.000Z | prod +4.05 | 11.875 | 2024-05-10T00:20:00.000Z | qa +3.19 | 9.5 | 2024-05-10T00:20:00.000Z | staging +11.860805860805861 | 12.375 | 2024-05-10T00:15:00.000Z | prod +23.702205882352942 | 12.125 | 2024-05-10T00:15:00.000Z | qa +7.784911616161616 | 11.5 | 2024-05-10T00:15:00.000Z | staging ; oneRateWithBucketAndClusterThenFilter diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java index 514c36c24a443..4ae65eadf88e5 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java @@ -48,7 +48,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.closeTo; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -528,7 +527,6 @@ public void testGroupBySubset() { try (EsqlQueryResponse resp = run(String.format(Locale.ROOT, """ TS %s | STATS - values(metrics.gaugel_hdd.bytes.used), max(max_over_time(metrics.gaugel_hdd.bytes.used)), min(min_over_time(metrics.gaugel_hdd.bytes.used)), sum(count_over_time(metrics.gaugel_hdd.bytes.used)), @@ -541,29 +539,20 @@ public void testGroupBySubset() { var groups = groupedRows(documents, dimensions, 60); List> rows = consumeRows(resp); for (List row : rows) { - var rowKey = getRowKey(row, dimensions, 7); + var rowKey = getRowKey(row, dimensions, 6); var tsGroups = groupByTimeseries(groups.get(rowKey), "gaugel_hdd.bytes.used"); - var docValues = valuesInWindow(groups.get(rowKey), "gaugel_hdd.bytes.used"); - if (row.get(0) instanceof List) { - assertThat( - (Collection) row.getFirst(), - containsInAnyOrder(docValues.stream().mapToLong(Integer::longValue).boxed().toArray(Long[]::new)) - ); - } else { - assertThat(row.getFirst(), equalTo(docValues.isEmpty() ? null : docValues.getFirst().longValue())); - } Function toDouble = cell -> switch (cell) { case Long l -> l.doubleValue(); case Double d -> d; case null -> null; default -> throw new IllegalStateException("Unexpected value type: " + cell + " of class " + cell.getClass()); }; - assertThat(toDouble.apply(row.get(1)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX))); - assertThat(toDouble.apply(row.get(2)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN))); - assertThat(toDouble.apply(row.get(3)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT))); - assertThat(toDouble.apply(row.get(4)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM))); + assertThat(toDouble.apply(row.get(0)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX))); + assertThat(toDouble.apply(row.get(1)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN))); + assertThat(toDouble.apply(row.get(2)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT))); + assertThat(toDouble.apply(row.get(3)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM))); var avg = (Double) aggregatePerTimeseries(tsGroups, Agg.AVG, Agg.AVG); - assertThat((Double) row.get(5), row.get(5) == null ? equalTo(null) : closeTo(avg, avg * 0.01)); + assertThat((Double) row.get(4), row.get(4) == null ? equalTo(null) : closeTo(avg, avg * 0.01)); // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue())); } } @@ -579,7 +568,6 @@ public void testGroupByNothing() { try (EsqlQueryResponse resp = run(String.format(Locale.ROOT, """ TS %s | STATS - values(metrics.gaugel_hdd.bytes.used), max(max_over_time(metrics.gaugel_hdd.bytes.used)), min(min_over_time(metrics.gaugel_hdd.bytes.used)), sum(count_over_time(metrics.gaugel_hdd.bytes.used)), @@ -592,29 +580,20 @@ public void testGroupByNothing() { List> rows = consumeRows(resp); var groups = groupedRows(documents, List.of(), 60); for (List row : rows) { - var windowStart = windowStart(row.get(7), 60); - List docValues = valuesInWindow(groups.get(List.of(Long.toString(windowStart))), "gaugel_hdd.bytes.used"); + var windowStart = windowStart(row.get(6), 60); var tsGroups = groupByTimeseries(groups.get(List.of(Long.toString(windowStart))), "gaugel_hdd.bytes.used"); - if (row.get(0) instanceof List) { - assertThat( - (Collection) row.get(0), - containsInAnyOrder(docValues.stream().mapToLong(Integer::longValue).boxed().toArray(Long[]::new)) - ); - } else { - assertThat(row.getFirst(), equalTo(docValues.isEmpty() ? null : docValues.getFirst().longValue())); - } Function toDouble = cell -> switch (cell) { case Long l -> l.doubleValue(); case Double d -> d; case null -> null; default -> throw new IllegalStateException("Unexpected value type: " + cell + " of class " + cell.getClass()); }; - assertThat(toDouble.apply(row.get(1)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX))); - assertThat(toDouble.apply(row.get(2)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN))); - assertThat(toDouble.apply(row.get(3)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT))); - assertThat(toDouble.apply(row.get(4)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM))); + assertThat(toDouble.apply(row.get(0)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MAX, Agg.MAX))); + assertThat(toDouble.apply(row.get(1)), equalTo(aggregatePerTimeseries(tsGroups, Agg.MIN, Agg.MIN))); + assertThat(toDouble.apply(row.get(2)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.COUNT))); + assertThat(toDouble.apply(row.get(3)), equalTo(aggregatePerTimeseries(tsGroups, Agg.SUM, Agg.SUM))); var avg = (Double) aggregatePerTimeseries(tsGroups, Agg.AVG, Agg.AVG); - assertThat((Double) row.get(5), row.get(5) == null ? equalTo(null) : closeTo(avg, avg * 0.01)); + assertThat((Double) row.get(4), row.get(4) == null ? equalTo(null) : closeTo(avg, avg * 0.01)); // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue())); } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TelemetryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TelemetryIT.java index e751fbef00f78..58eaac9a7e989 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TelemetryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TelemetryIT.java @@ -157,7 +157,7 @@ public static Iterable parameters() { ) }, new Object[] { new Test( - "TS time_series_idx | STATS max(id) BY host | LIMIT 10", + "TS time_series_idx | STATS max(cpu) BY host | LIMIT 10", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled() ? Map.ofEntries(Map.entry("TS", 1), Map.entry("STATS", 1), Map.entry("LIMIT", 1)) : Collections.emptyMap(), diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java index efaef24675e2b..121c884a57448 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java @@ -159,21 +159,7 @@ public void populateIndex() { client().admin().indices().prepareRefresh("hosts").get(); } - public void testSimpleMetrics() { - List sortedGroups = docs.stream().map(d -> d.host).distinct().sorted().toList(); - client().admin().indices().prepareRefresh("hosts").get(); - try (EsqlQueryResponse resp = run("TS hosts | STATS load=avg(cpu) BY host | SORT host")) { - List> rows = EsqlTestUtils.getValuesList(resp); - assertThat(rows, hasSize(sortedGroups.size())); - for (int i = 0; i < rows.size(); i++) { - List r = rows.get(i); - String pod = (String) r.get(1); - assertThat(pod, equalTo(sortedGroups.get(i))); - List values = docs.stream().filter(d -> d.host.equals(pod)).map(d -> d.cpu).toList(); - double avg = values.stream().mapToDouble(n -> n).sum() / values.size(); - assertThat((double) r.get(0), equalTo(avg)); - } - } + public void testWithoutStats() { try (EsqlQueryResponse resp = run("TS hosts | SORT @timestamp DESC, host | KEEP @timestamp, host, cpu | LIMIT 5")) { List> rows = EsqlTestUtils.getValuesList(resp); List topDocs = docs.stream() @@ -193,6 +179,34 @@ public void testSimpleMetrics() { } } + public void testImplicitAggregate() { + List sortedGroups = docs.stream().map(d -> d.host).distinct().sorted().toList(); + client().admin().indices().prepareRefresh("hosts").get(); + try (EsqlQueryResponse resp = run("TS hosts | STATS load=avg(cpu) BY host | SORT host")) { + List> rows = EsqlTestUtils.getValuesList(resp); + assertThat(rows, hasSize(sortedGroups.size())); + for (int i = 0; i < rows.size(); i++) { + List r = rows.get(i); + String pod = (String) r.get(1); + assertThat(pod, equalTo(sortedGroups.get(i))); + Map selected = new HashMap<>(); + for (Doc doc : docs) { + if (doc.host.equals(pod) == false) { + continue; + } + selected.compute(doc.cluster, (k, v) -> v == null || doc.timestamp > v.timestamp ? doc : v); + } + double sum = selected.values().stream().mapToDouble(d -> d.cpu).sum(); + double avg = sum / selected.size(); + assertThat((double) r.get(0), equalTo(avg)); + } + try (EsqlQueryResponse resp2 = run("TS hosts | STATS load=avg(last_over_time(cpu)) BY host | SORT host")) { + List> rows2 = EsqlTestUtils.getValuesList(resp2); + assertThat(rows2, equalTo(rows)); + } + } + } + public void testRateWithoutGrouping() { record RateKey(String cluster, String host) { @@ -249,27 +263,21 @@ record RateKey(String cluster, String host) { final double avg = rates.isEmpty() ? 0.0 : rates.stream().mapToDouble(d -> d).sum() / rates.size(); assertThat((double) values.get(0).get(1), closeTo(avg, 0.1)); } - try (var resp = run("TS hosts | STATS max(rate(request_count)), min(rate(request_count)), min(cpu), max(cpu)")) { + try (var resp = run("TS hosts | STATS max(rate(request_count)), min(rate(request_count))")) { assertThat( resp.columns(), equalTo( List.of( new ColumnInfoImpl("max(rate(request_count))", "double", null), - new ColumnInfoImpl("min(rate(request_count))", "double", null), - new ColumnInfoImpl("min(cpu)", "double", null), - new ColumnInfoImpl("max(cpu)", "double", null) + new ColumnInfoImpl("min(rate(request_count))", "double", null) ) ) ); List> values = EsqlTestUtils.getValuesList(resp); assertThat(values, hasSize(1)); - assertThat(values.get(0), hasSize(4)); + assertThat(values.get(0), hasSize(2)); assertThat((double) values.get(0).get(0), closeTo(rates.stream().mapToDouble(d -> d).max().orElse(0.0), 0.1)); assertThat((double) values.get(0).get(1), closeTo(rates.stream().mapToDouble(d -> d).min().orElse(0.0), 0.1)); - double minCpu = docs.stream().mapToDouble(d -> d.cpu).min().orElse(Long.MAX_VALUE); - double maxCpu = docs.stream().mapToDouble(d -> d.cpu).max().orElse(Long.MIN_VALUE); - assertThat((double) values.get(0).get(2), closeTo(minCpu, 0.1)); - assertThat((double) values.get(0).get(3), closeTo(maxCpu, 0.1)); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 36ee0536e393b..a9dad8c322b12 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1442,7 +1442,12 @@ public enum Cap { /** * FORK with remote indices */ - ENABLE_FORK_FOR_REMOTE_INDICES(Build.current().isSnapshot()); + ENABLE_FORK_FOR_REMOTE_INDICES(Build.current().isSnapshot()), + + /** + * Implicitly applies last_over_time in time-series aggregations when no specific over_time function is provided. + */ + IMPLICIT_LAST_OVER_TIME(Build.current().isSnapshot()); private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java index 82ddc388b2f16..20979f0748ac4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java @@ -41,9 +41,6 @@ public static List getNamedWriteables() { SumOverTime.ENTRY, CountOverTime.ENTRY, CountDistinctOverTime.ENTRY, - // internal functions - ToPartial.ENTRY, - FromPartial.ENTRY, WeightedAvg.ENTRY ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FromPartial.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FromPartial.java deleted file mode 100644 index bb9ed1780053f..0000000000000 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FromPartial.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.expression.function.aggregate; - -import org.elasticsearch.TransportVersions; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.compute.aggregation.Aggregator; -import org.elasticsearch.compute.aggregation.AggregatorFunction; -import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; -import org.elasticsearch.compute.aggregation.AggregatorMode; -import org.elasticsearch.compute.aggregation.FromPartialAggregatorFunction; -import org.elasticsearch.compute.aggregation.FromPartialGroupingAggregatorFunction; -import org.elasticsearch.compute.aggregation.GroupingAggregator; -import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; -import org.elasticsearch.compute.aggregation.IntermediateStateDesc; -import org.elasticsearch.compute.operator.DriverContext; -import org.elasticsearch.xpack.esql.core.expression.AttributeSet; -import org.elasticsearch.xpack.esql.core.expression.Expression; -import org.elasticsearch.xpack.esql.core.expression.Literal; -import org.elasticsearch.xpack.esql.core.tree.NodeInfo; -import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.core.type.DataType; -import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; -import org.elasticsearch.xpack.esql.planner.ToAggregator; - -import java.io.IOException; -import java.util.List; -import java.util.stream.IntStream; - -/** - * @see ToPartial - */ -public class FromPartial extends AggregateFunction implements ToAggregator { - public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( - Expression.class, - "FromPartial", - FromPartial::new - ); - - private final Expression function; - - public FromPartial(Source source, Expression field, Expression function) { - this(source, field, Literal.TRUE, function); - } - - public FromPartial(Source source, Expression field, Expression filter, Expression function) { - super(source, field, filter, List.of(function)); - this.function = function; - } - - private FromPartial(StreamInput in) throws IOException { - this( - Source.readFrom((PlanStreamInput) in), - in.readNamedWriteable(Expression.class), - in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readNamedWriteable(Expression.class) : Literal.TRUE, - in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) - ? in.readNamedWriteableCollectionAsList(Expression.class).get(0) - : in.readNamedWriteable(Expression.class) - ); - } - - @Override - protected void deprecatedWriteParams(StreamOutput out) throws IOException { - out.writeNamedWriteable(function); - } - - @Override - public String getWriteableName() { - return ENTRY.name; - } - - public Expression function() { - return function; - } - - @Override - public DataType dataType() { - return function.dataType(); - } - - @Override - protected TypeResolution resolveType() { - return TypeResolution.TYPE_RESOLVED; - } - - @Override - public AttributeSet references() { - return field().references(); // exclude the function and its argument - } - - @Override - public Expression replaceChildren(List newChildren) { - return new FromPartial(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); - } - - @Override - protected NodeInfo info() { - return NodeInfo.create(this, FromPartial::new, field(), filter(), function); - } - - @Override - public FromPartial withFilter(Expression filter) { - return new FromPartial(source(), field(), filter, function); - } - - @Override - public AggregatorFunctionSupplier supplier() { - final AggregatorFunctionSupplier supplier = ((ToAggregator) function).supplier(); - return new AggregatorFunctionSupplier() { - @Override - public List nonGroupingIntermediateStateDesc() { - return FromPartialAggregatorFunction.intermediateStateDesc(); - } - - @Override - public List groupingIntermediateStateDesc() { - return FromPartialGroupingAggregatorFunction.intermediateStateDesc(); - } - - @Override - public AggregatorFunction aggregator(DriverContext driverContext, List channels) { - assert false : "aggregatorFactory() is override"; - throw new UnsupportedOperationException(); - } - - @Override - public GroupingAggregatorFunction groupingAggregator(DriverContext driverContext, List channels) { - assert false : "groupingAggregatorFactory() is override"; - throw new UnsupportedOperationException(); - } - - @Override - public Aggregator.Factory aggregatorFactory(AggregatorMode mode, List channels) { - if (channels.size() != 1) { - assert false : "from_partial aggregation requires exactly one input channel; got " + channels; - throw new IllegalArgumentException("from_partial aggregation requires exactly one input channel; got " + channels); - } - final int inputChannel = channels.get(0); - var intermediateChannels = IntStream.range(0, supplier.nonGroupingIntermediateStateDesc().size()).boxed().toList(); - return new Aggregator.Factory() { - @Override - public Aggregator apply(DriverContext driverContext) { - // use groupingAggregator since we can receive intermediate output from a grouping aggregate - final var groupingAggregator = supplier.groupingAggregator(driverContext, intermediateChannels); - return new Aggregator(new FromPartialAggregatorFunction(driverContext, groupingAggregator, inputChannel), mode); - } - - @Override - public String describe() { - return "from_partial(" + supplier.describe() + ")"; - } - }; - } - - @Override - public GroupingAggregator.Factory groupingAggregatorFactory(AggregatorMode mode, List channels) { - if (channels.size() != 1) { - assert false : "from_partial aggregation requires exactly one input channel; got " + channels; - throw new IllegalArgumentException("from_partial aggregation requires exactly one input channel; got " + channels); - } - final int inputChannel = channels.get(0); - var intermediateChannels = IntStream.range(0, supplier.nonGroupingIntermediateStateDesc().size()).boxed().toList(); - return new GroupingAggregator.Factory() { - @Override - public GroupingAggregator apply(DriverContext driverContext) { - final GroupingAggregatorFunction aggregator = supplier.groupingAggregator(driverContext, intermediateChannels); - return new GroupingAggregator(new FromPartialGroupingAggregatorFunction(aggregator, inputChannel), mode); - } - - @Override - public String describe() { - return "from_partial(" + supplier.describe() + ")"; - } - }; - } - - @Override - public String describe() { - return "from_partial"; - } - }; - } -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/ToPartial.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/ToPartial.java deleted file mode 100644 index 04dadb5e3bb91..0000000000000 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/ToPartial.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.expression.function.aggregate; - -import org.elasticsearch.TransportVersions; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.compute.aggregation.Aggregator; -import org.elasticsearch.compute.aggregation.AggregatorFunction; -import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; -import org.elasticsearch.compute.aggregation.AggregatorMode; -import org.elasticsearch.compute.aggregation.FromPartialGroupingAggregatorFunction; -import org.elasticsearch.compute.aggregation.GroupingAggregator; -import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; -import org.elasticsearch.compute.aggregation.IntermediateStateDesc; -import org.elasticsearch.compute.aggregation.ToPartialAggregatorFunction; -import org.elasticsearch.compute.aggregation.ToPartialGroupingAggregatorFunction; -import org.elasticsearch.compute.operator.DriverContext; -import org.elasticsearch.xpack.esql.core.expression.Expression; -import org.elasticsearch.xpack.esql.core.expression.Literal; -import org.elasticsearch.xpack.esql.core.tree.NodeInfo; -import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.core.type.DataType; -import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; -import org.elasticsearch.xpack.esql.planner.ToAggregator; - -import java.io.IOException; -import java.util.List; -import java.util.stream.IntStream; - -/** - * An internal aggregate function that always emits intermediate (or partial) output regardless - * of the aggregate mode. The intermediate output should be consumed by {@link FromPartial}, - * which always receives the intermediate input. Since an intermediate aggregate output can - * consist of multiple blocks, we wrap these output blocks in a single composite block. - * The {@link FromPartial} then unwraps this input block into multiple primitive blocks and - * passes them to the delegating GroupingAggregatorFunction. - *

- * Both of these commands yield the same result, except the second plan executes aggregates twice: - *

- * ```
- * | ... before
- * | af(x) BY g
- * | ... after
- * ```
- * ```
- * | ... before
- * | $x = to_partial(af(x)) BY g
- * | from_partial($x, af(_)) BY g
- * | ...  after
- * 
- * ``` - * @see ToPartialGroupingAggregatorFunction - * @see FromPartialGroupingAggregatorFunction - */ -public class ToPartial extends AggregateFunction implements ToAggregator { - public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( - Expression.class, - "ToPartial", - ToPartial::new - ); - - private final Expression function; - - public ToPartial(Source source, Expression field, Expression function) { - this(source, field, Literal.TRUE, function); - } - - public ToPartial(Source source, Expression field, Expression filter, Expression function) { - super(source, field, filter, List.of(function)); - this.function = function; - } - - private ToPartial(StreamInput in) throws IOException { - this( - Source.readFrom((PlanStreamInput) in), - in.readNamedWriteable(Expression.class), - in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readNamedWriteable(Expression.class) : Literal.TRUE, - in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) - ? in.readNamedWriteableCollectionAsList(Expression.class).get(0) - : in.readNamedWriteable(Expression.class) - ); - } - - @Override - protected void deprecatedWriteParams(StreamOutput out) throws IOException { - out.writeNamedWriteable(function); - } - - @Override - public String getWriteableName() { - return ENTRY.name; - } - - public Expression function() { - return function; - } - - @Override - public DataType dataType() { - return DataType.PARTIAL_AGG; - } - - @Override - protected TypeResolution resolveType() { - return function.typeResolved(); - } - - @Override - public Expression replaceChildren(List newChildren) { - return new ToPartial(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); - } - - @Override - public ToPartial withFilter(Expression filter) { - return new ToPartial(source(), field(), filter(), function); - } - - @Override - protected NodeInfo info() { - return NodeInfo.create(this, ToPartial::new, field(), filter(), function); - } - - @Override - public AggregatorFunctionSupplier supplier() { - final AggregatorFunctionSupplier supplier = ((ToAggregator) function).supplier(); - return new AggregatorFunctionSupplier() { - @Override - public List nonGroupingIntermediateStateDesc() { - return ToPartialAggregatorFunction.intermediateStateDesc(); - } - - @Override - public List groupingIntermediateStateDesc() { - return ToPartialGroupingAggregatorFunction.intermediateStateDesc(); - } - - @Override - public AggregatorFunction aggregator(DriverContext driverContext, List channels) { - assert false : "aggregatorFactory() is override"; - throw new UnsupportedOperationException(); - } - - @Override - public GroupingAggregatorFunction groupingAggregator(DriverContext driverContext, List channels) { - assert false : "groupingAggregatorFactory() is override"; - throw new UnsupportedOperationException(); - } - - @Override - public Aggregator.Factory aggregatorFactory(AggregatorMode mode, List channels) { - List intermediateChannels = mode.isInputPartial() - ? IntStream.range(0, supplier.nonGroupingIntermediateStateDesc().size()).boxed().toList() - : channels; - return new Aggregator.Factory() { - @Override - public Aggregator apply(DriverContext driverContext) { - final AggregatorFunction aggregatorFunction = supplier.aggregator(driverContext, intermediateChannels); - return new Aggregator(new ToPartialAggregatorFunction(aggregatorFunction, channels), mode); - } - - @Override - public String describe() { - return "to_partial(" + supplier.describe() + ")"; - } - }; - } - - @Override - public GroupingAggregator.Factory groupingAggregatorFactory(AggregatorMode mode, List channels) { - List intermediateChannels = mode.isInputPartial() - ? IntStream.range(0, supplier.nonGroupingIntermediateStateDesc().size()).boxed().toList() - : channels; - return new GroupingAggregator.Factory() { - @Override - public GroupingAggregator apply(DriverContext driverContext) { - final GroupingAggregatorFunction aggregatorFunction = supplier.groupingAggregator( - driverContext, - intermediateChannels - ); - return new GroupingAggregator(new ToPartialGroupingAggregatorFunction(aggregatorFunction, channels), mode); - } - - @Override - public String describe() { - return "to_partial(" + supplier.describe() + ")"; - } - }; - } - - @Override - public String describe() { - return "to_partial"; - } - }; - } -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java index 71ef06c9432fe..72d31c24f48df 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java @@ -19,10 +19,9 @@ import org.elasticsearch.xpack.esql.core.util.CollectionUtils; import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; -import org.elasticsearch.xpack.esql.expression.function.aggregate.FromPartial; +import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate; import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction; -import org.elasticsearch.xpack.esql.expression.function.aggregate.ToPartial; import org.elasticsearch.xpack.esql.expression.function.aggregate.Values; import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket; import org.elasticsearch.xpack.esql.expression.function.grouping.TBucket; @@ -85,25 +84,23 @@ * | KEEP `avg(rate(request))`, host, `bucket(@timestamp, 1minute)` * * - * Non-rate aggregates will be rewritten as a pair of to_partial and from_partial aggregates, where the `to_partial` - * aggregates will be executed in the first pass and always produce an intermediate output regardless of the aggregate - * mode. The `from_partial` aggregates will be executed on the second pass and always receive intermediate output - * produced by `to_partial`. Examples: + * Non time-series aggregates will be rewritten with last_over_time used in the first pass aggregation. + * Here, we don't have the staleness interval, but allow any value within the bucket (_tsid and optionally time-bucket). * *
  * TS k8s | STATS max(rate(request)), max(memory_used) becomes:
  *
  * TS k8s
- * | STATS rate_$1=rate(request), $p1=to_partial(max(memory_used)) BY _tsid
- * | STATS max(rate_$1), `max(memory_used)` = from_partial($p1, max($_))
+ * | STATS rate_$1=rate(request), $last_m1=last_over_time(memory_used) BY _tsid
+ * | STATS max(rate_$1), `max(memory_used)` = max($last_m1)
  *
  * TS k8s | STATS max(rate(request)) avg(memory_used) BY host
  *
  * becomes
  *
  * TS k8s
- * | STATS rate_$1=rate(request), $p1=to_partial(sum(memory_used)), $p2=to_partial(count(memory_used)), VALUES(host) BY _tsid
- * | STATS max(rate_$1), $sum=from_partial($p1, sum($_)), $count=from_partial($p2, count($_)) BY host=`VALUES(host)`
+ * | STATS rate_$1=rate(request), $p1=last_over_time(memory_used), VALUES(host) BY _tsid
+ * | STATS max(rate_$1), $sum=sum($p1), $count=count($p1) BY host=`VALUES(host)`
  * | EVAL `avg(memory_used)` = $sum / $count
  * | KEEP `max(rate(request))`, `avg(memory_used)`, host
  *
@@ -113,8 +110,8 @@
  *
  * TS k8s
  * | EVAL `bucket(@timestamp, 5m)` = datetrunc(@timestamp, '5m')
- * | STATS rate_$1=rate(request), $p1=to_partial(min(memory_used)), VALUES(pod) BY _tsid, `bucket(@timestamp, 5m)`
- * | STATS sum(rate_$1), `min(memory_used)` = from_partial($p1, min($)) BY pod=`VALUES(pod)`, `bucket(@timestamp, 5m)`
+ * | STATS rate_$1=rate(request), $p1=last_over_time(memory_used)), VALUES(pod) BY _tsid, `bucket(@timestamp, 5m)`
+ * | STATS sum(rate_$1), `min(memory_used)` = min($p1) BY pod=`VALUES(pod)`, `bucket(@timestamp, 5m)`
  * | KEEP `min(memory_used)`, `sum(rate_$1)`, pod, `bucket(@timestamp, 5m)`
  *
  * {agg}_over_time time-series aggregation will be rewritten in the similar way
@@ -161,6 +158,24 @@ protected LogicalPlan rule(Aggregate aggregate) {
     }
 
     LogicalPlan translate(TimeSeriesAggregate aggregate) {
+        Holder tsid = new Holder<>();
+        Holder timestamp = new Holder<>();
+        aggregate.forEachDown(EsRelation.class, r -> {
+            for (Attribute attr : r.output()) {
+                if (attr.name().equals(MetadataAttribute.TSID_FIELD)) {
+                    tsid.set(attr);
+                }
+                if (attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
+                    timestamp.set(attr);
+                }
+            }
+        });
+        if (tsid.get() == null) {
+            tsid.set(new MetadataAttribute(aggregate.source(), MetadataAttribute.TSID_FIELD, DataType.KEYWORD, false));
+        }
+        if (timestamp.get() == null) {
+            throw new IllegalArgumentException("_tsid or @timestamp field are missing from the time-series source");
+        }
         Map timeSeriesAggs = new HashMap<>();
         List firstPassAggs = new ArrayList<>();
         List secondPassAggs = new ArrayList<>();
@@ -185,34 +200,17 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) {
                 if (changed.get()) {
                     secondPassAggs.add(new Alias(alias.source(), alias.name(), outerAgg, agg.id()));
                 } else {
-                    var toPartial = new Alias(agg.source(), alias.name(), new ToPartial(agg.source(), af.field(), af));
-                    var fromPartial = new FromPartial(agg.source(), toPartial.toAttribute(), af);
-                    firstPassAggs.add(toPartial);
-                    secondPassAggs.add(new Alias(alias.source(), alias.name(), fromPartial, alias.id()));
-                }
-            }
-        }
-        if (timeSeriesAggs.isEmpty()) {
-            // no time-series aggregations, run a regular aggregation instead.
-            return new Aggregate(aggregate.source(), aggregate.child(), aggregate.groupings(), aggregate.aggregates());
-        }
-        Holder tsid = new Holder<>();
-        Holder timestamp = new Holder<>();
-        aggregate.forEachDown(EsRelation.class, r -> {
-            for (Attribute attr : r.output()) {
-                if (attr.name().equals(MetadataAttribute.TSID_FIELD)) {
-                    tsid.set(attr);
-                }
-                if (attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
-                    timestamp.set(attr);
+                    // TODO: reject over_time_aggregation only
+                    var tsAgg = new LastOverTime(af.source(), af.field(), timestamp.get());
+                    AggregateFunction firstStageFn = tsAgg.perTimeSeriesAggregation();
+                    Alias newAgg = timeSeriesAggs.computeIfAbsent(firstStageFn, k -> {
+                        Alias firstStageAlias = new Alias(tsAgg.source(), internalNames.next(tsAgg.functionName()), firstStageFn);
+                        firstPassAggs.add(firstStageAlias);
+                        return firstStageAlias;
+                    });
+                    secondPassAggs.add((Alias) agg.transformUp(f -> f == af.field(), f -> newAgg.toAttribute()));
                 }
             }
-        });
-        if (tsid.get() == null) {
-            tsid.set(new MetadataAttribute(aggregate.source(), MetadataAttribute.TSID_FIELD, DataType.KEYWORD, false));
-        }
-        if (timestamp.get() == null) {
-            throw new IllegalArgumentException("_tsid or @timestamp field are missing from the time-series source");
         }
         // time-series aggregates must be grouped by _tsid (and time-bucket) first and re-group by users key
         List firstPassGroupings = new ArrayList<>();
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java
index 991da09fc8dd2..696159d6e5eb1 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java
@@ -321,8 +321,10 @@ public PlanFactory visitInsistCommand(EsqlBaseParser.InsistCommandContext ctx) {
     @Override
     public PlanFactory visitStatsCommand(EsqlBaseParser.StatsCommandContext ctx) {
         final Stats stats = stats(source(ctx), ctx.grouping, ctx.stats);
+        // Only the first STATS command in a TS query is treated as the time-series aggregation
         return input -> {
-            if (input.anyMatch(p -> p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES)) {
+            if (input.anyMatch(p -> p instanceof Aggregate) == false
+                && input.anyMatch(p -> p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES)) {
                 return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates, null);
             } else {
                 return new Aggregate(source(ctx), input, stats.groupings, stats.aggregates);
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
index 82119fb7baa82..41db4b35e5287 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
@@ -46,14 +46,13 @@
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
-import org.elasticsearch.xpack.esql.expression.function.aggregate.FromPartial;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Max;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Min;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Percentile;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.SummationMode;
-import org.elasticsearch.xpack.esql.expression.function.aggregate.ToPartial;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
 import org.elasticsearch.xpack.esql.expression.function.fulltext.Match;
 import org.elasticsearch.xpack.esql.expression.function.fulltext.MultiMatch;
@@ -7346,7 +7345,7 @@ public void testTranslateMixedAggsWithoutGrouping() {
 
         assertThat(finalAggs.aggregates(), hasSize(2));
         Max maxRate = as(Alias.unwrap(finalAggs.aggregates().get(0)), Max.class);
-        FromPartial maxCost = as(Alias.unwrap(finalAggs.aggregates().get(1)), FromPartial.class);
+        Max maxCost = as(Alias.unwrap(finalAggs.aggregates().get(1)), Max.class);
         assertThat(Expressions.attribute(maxRate.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
         assertThat(Expressions.attribute(maxCost.field()).id(), equalTo(aggsByTsid.aggregates().get(1).id()));
         assertThat(finalAggs.groupings(), empty());
@@ -7354,8 +7353,8 @@ public void testTranslateMixedAggsWithoutGrouping() {
         assertThat(aggsByTsid.aggregates(), hasSize(2));
         Rate rate = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), Rate.class);
         assertThat(Expressions.attribute(rate.field()).name(), equalTo("network.total_bytes_in"));
-        ToPartial toPartialMaxCost = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), ToPartial.class);
-        assertThat(Expressions.attribute(toPartialMaxCost.field()).name(), equalTo("network.cost"));
+        LastOverTime lastCost = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), LastOverTime.class);
+        assertThat(Expressions.attribute(lastCost.field()).name(), equalTo("network.cost"));
     }
 
     public void testTranslateMixedAggsWithMathWithoutGrouping() {
@@ -7383,15 +7382,15 @@ public void testTranslateMixedAggsWithMathWithoutGrouping() {
         assertThat(mul.right().fold(FoldContext.small()), equalTo(1.1));
 
         Max maxRate = as(Alias.unwrap(finalAggs.aggregates().get(0)), Max.class);
-        FromPartial maxCost = as(Alias.unwrap(finalAggs.aggregates().get(1)), FromPartial.class);
+        Max maxCost = as(Alias.unwrap(finalAggs.aggregates().get(1)), Max.class);
         assertThat(Expressions.attribute(maxRate.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
         assertThat(Expressions.attribute(maxCost.field()).id(), equalTo(aggsByTsid.aggregates().get(1).id()));
         assertThat(finalAggs.groupings(), empty());
 
         Rate rate = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), Rate.class);
         assertThat(Expressions.attribute(rate.field()).name(), equalTo("network.total_bytes_in"));
-        ToPartial toPartialMaxCost = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), ToPartial.class);
-        assertThat(Expressions.attribute(toPartialMaxCost.field()).id(), equalTo(addEval.fields().get(0).id()));
+        LastOverTime lastCost = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), LastOverTime.class);
+        assertThat(Expressions.attribute(lastCost.field()).id(), equalTo(addEval.fields().get(0).id()));
         assertThat(Expressions.attribute(add.left()).name(), equalTo("network.cost"));
         assertThat(add.right().fold(FoldContext.small()), equalTo(0.2));
     }
@@ -7584,24 +7583,20 @@ public void testTranslateMixedAggsGroupedByTimeBucketAndDimensions() {
         assertThat(Expressions.attribute(sumRate.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
         assertThat(Expressions.attribute(countRate.field()).id(), equalTo(aggsByTsid.aggregates().get(0).id()));
 
-        FromPartial sumCost = as(Alias.unwrap(finalAgg.aggregates().get(2)), FromPartial.class);
-        FromPartial countCost = as(Alias.unwrap(finalAgg.aggregates().get(3)), FromPartial.class);
+        Sum sumCost = as(Alias.unwrap(finalAgg.aggregates().get(2)), Sum.class);
+        Count countCost = as(Alias.unwrap(finalAgg.aggregates().get(3)), Count.class);
         assertThat(Expressions.attribute(sumCost.field()).id(), equalTo(aggsByTsid.aggregates().get(1).id()));
-        assertThat(Expressions.attribute(countCost.field()).id(), equalTo(aggsByTsid.aggregates().get(2).id()));
+        assertThat(Expressions.attribute(countCost.field()).id(), equalTo(aggsByTsid.aggregates().get(1).id()));
 
         assertThat(finalAgg.groupings(), hasSize(2));
-        assertThat(Expressions.attribute(finalAgg.groupings().get(0)).id(), equalTo(aggsByTsid.aggregates().get(3).id()));
+        assertThat(Expressions.attribute(finalAgg.groupings().get(0)).id(), equalTo(aggsByTsid.aggregates().get(2).id()));
 
-        assertThat(aggsByTsid.aggregates(), hasSize(5)); // rate, to_partial(sum(cost)), to_partial(count(cost)), values(cluster), bucket
+        assertThat(aggsByTsid.aggregates(), hasSize(4)); // rate, last_over_time, values(cluster), bucket
         Rate rate = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), Rate.class);
         assertThat(Expressions.attribute(rate.field()).name(), equalTo("network.total_bytes_in"));
-        ToPartial toPartialSum = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), ToPartial.class);
-        assertThat(toPartialSum.function(), instanceOf(Sum.class));
-        assertThat(Expressions.attribute(toPartialSum.field()).name(), equalTo("network.cost"));
-        ToPartial toPartialCount = as(Alias.unwrap(aggsByTsid.aggregates().get(2)), ToPartial.class);
-        assertThat(toPartialCount.function(), instanceOf(Count.class));
-        assertThat(Expressions.attribute(toPartialCount.field()).name(), equalTo("network.cost"));
-        Values clusterValues = as(Alias.unwrap(aggsByTsid.aggregates().get(4)), Values.class);
+        LastOverTime lastSum = as(Alias.unwrap(aggsByTsid.aggregates().get(1)), LastOverTime.class);
+        assertThat(Expressions.attribute(lastSum.field()).name(), equalTo("network.cost"));
+        Values clusterValues = as(Alias.unwrap(aggsByTsid.aggregates().get(3)), Values.class);
         assertThat(Expressions.attribute(clusterValues.field()).name(), equalTo("cluster"));
     }
 
@@ -7725,36 +7720,34 @@ public void testTranslateAvgOverTime() {
         assertThat(Expressions.attribute(bucket.field()).name(), equalTo("@timestamp"));
     }
 
-    public void testMetricsWithoutRate() {
+    public void testTranslateLastOverTime() {
         assumeTrue("requires metrics command", EsqlCapabilities.Cap.METRICS_COMMAND.isEnabled());
-        List queries = List.of("""
-            TS k8s | STATS count(to_long(network.total_bytes_in)) BY bucket(@timestamp, 1 minute)
-            | LIMIT 10
-            """, """
-            FROM k8s | STATS count(to_long(network.total_bytes_in)) BY bucket(@timestamp, 1 minute)
+        var query = """
+            TS k8s | STATS avg(last_over_time(network.bytes_in)) BY bucket(@timestamp, 1 minute)
             | LIMIT 10
-            """);
-        List plans = new ArrayList<>();
-        for (String query : queries) {
-            var plan = logicalOptimizer.optimize(metricsAnalyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)));
-            plans.add(plan);
-        }
-        for (LogicalPlan plan : plans) {
-            Limit limit = as(plan, Limit.class);
-            Aggregate aggregate = as(limit.child(), Aggregate.class);
-            assertThat(aggregate, not(instanceOf(TimeSeriesAggregate.class)));
-            assertThat(aggregate.aggregates(), hasSize(2));
-            assertThat(aggregate.groupings(), hasSize(1));
-            Eval eval = as(aggregate.child(), Eval.class);
-            assertThat(eval.fields(), hasSize(2));
-            assertThat(Alias.unwrap(eval.fields().get(0)), instanceOf(Bucket.class));
-            assertThat(Alias.unwrap(eval.fields().get(1)), instanceOf(ToLong.class));
-            EsRelation relation = as(eval.child(), EsRelation.class);
-            assertThat(relation.indexMode(), equalTo(IndexMode.STANDARD));
-        }
-        for (int i = 1; i < plans.size(); i++) {
-            assertThat(plans.get(i), equalTo(plans.get(0)));
-        }
+            """;
+        var plan = logicalOptimizer.optimize(metricsAnalyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)));
+        var project = as(plan, Project.class);
+        var eval = as(project.child(), Eval.class);
+        var limit = as(eval.child(), Limit.class);
+        Aggregate finalAgg = as(limit.child(), Aggregate.class);
+        assertThat(finalAgg, not(instanceOf(TimeSeriesAggregate.class)));
+        TimeSeriesAggregate aggsByTsid = as(finalAgg.child(), TimeSeriesAggregate.class);
+        assertNotNull(aggsByTsid.timeBucket());
+        assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(1)));
+        Eval evalBucket = as(aggsByTsid.child(), Eval.class);
+        assertThat(evalBucket.fields(), hasSize(1));
+        EsRelation relation = as(evalBucket.child(), EsRelation.class);
+        assertThat(relation.indexMode(), equalTo(IndexMode.STANDARD));
+
+        as(Alias.unwrap(finalAgg.aggregates().get(0)), Sum.class);
+        as(Alias.unwrap(finalAgg.aggregates().get(1)), Count.class);
+
+        LastOverTime lastOverTime = as(Alias.unwrap(aggsByTsid.aggregates().get(0)), LastOverTime.class);
+        assertThat(Expressions.attribute(lastOverTime.field()).name(), equalTo("network.bytes_in"));
+        assertThat(Expressions.attribute(aggsByTsid.groupings().get(1)).id(), equalTo(evalBucket.fields().get(0).id()));
+        Bucket bucket = as(Alias.unwrap(evalBucket.fields().get(0)), Bucket.class);
+        assertThat(Expressions.attribute(bucket.field()).name(), equalTo("@timestamp"));
     }
 
     public void testMvSortInvalidOrder() {