diff --git a/docs/changelog/136062.yaml b/docs/changelog/136062.yaml new file mode 100644 index 0000000000000..6ddd4f0ca5616 --- /dev/null +++ b/docs/changelog/136062.yaml @@ -0,0 +1,6 @@ +pr: 136062 +summary: ESQL Fix bug when renaming @timestamp in TS queries +area: ES|QL +type: bug +issues: + - 134994 diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index 66f20720d674d..b7cd0a97bf720 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -559,3 +559,26 @@ cnt:long | cluster:keyword | pod:keyword 1 | prod | two 1 | prod | three ; + +Max of Rate with Bucket +required_capability: ts_command_v0 + +TS k8s +| STATS maxRate = max(rate(network.total_cost)) BY tbucket = bucket(@timestamp, 1hour) +; + +maxRate:double | tbucket:datetime +0.058979885057471274 | 2024-05-10T00:00:00.000Z +; + +Max of Rate with Bucket, Rename Timestamp +required_capability: ts_command_v0 + +TS k8s +| RENAME `@timestamp` AS newTs +| STATS maxRate = max(rate(network.total_cost)) BY tbucket = bucket(newTs, 1hour) +; + +maxRate:double | tbucket:datetime +0.058979885057471274 | 2024-05-10T00:00:00.000Z +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 7057ca3d36a45..a3d0e7732c2d2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -488,10 +488,15 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) { // Gather all the children's output in case of non-unary plans; even for unaries, we need to copy because we may mutate this to // simplify resolution of e.g. RENAME. for (LogicalPlan child : plan.children()) { - var output = child.output(); + List output = child.output(); childrenOutput.addAll(output); } + if (plan instanceof TimeSeriesAggregate tsAggregate) { + // NOTE: This MUST be checked before the Aggregate version, since TimeSeriesAggregate is a subclass of Aggregate + return resolveTimeSeriesAggregate(tsAggregate, childrenOutput); + } + if (plan instanceof Aggregate aggregate) { return resolveAggregate(aggregate, childrenOutput); } @@ -551,6 +556,63 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) { return plan.transformExpressionsOnly(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput)); } + /** + * This function is meant to deal with the implicit timestamp fields that some TS functions use. + */ + private TimeSeriesAggregate resolveTimeSeriesAggregate(TimeSeriesAggregate timeSeriesAggregate, List childrenOutput) { + Attribute tsAtter = null; + for (int i = 0; i < childrenOutput.size(); i++) { + if (childrenOutput.get(i).name().equals(MetadataAttribute.TIMESTAMP_FIELD)) { + tsAtter = childrenOutput.get(i); + break; + } + } + if (tsAtter == null) { + // if we didn't find a timestamp in the children output, time to do more work + Holder tsAttributeName = new Holder<>(MetadataAttribute.TIMESTAMP_FIELD); + Holder tsAttribute = new Holder<>(); + timeSeriesAggregate.forEachExpressionUp(Alias.class, a -> { + if (a.child() instanceof Attribute c) { + // will this ever not be true? + if (c.name().equals(tsAttributeName.get())) { + tsAttributeName.set(a.name()); + tsAttribute.set(a.toAttribute()); + } + } + }); + + // Now we know what timestamp is going to be called, replace our UnresolvedAttributes referencing timestamp with that name + List newGroupings = new ArrayList<>(timeSeriesAggregate.groupings().size()); + List newAggregates = new ArrayList<>(timeSeriesAggregate.aggregates().size()); + // TODO: Can we just resolve these here? we have the attribute + for (int i = 0; i < timeSeriesAggregate.groupings().size(); i++) { + newGroupings.add(timeSeriesAggregate.groupings().get(i).transformUp(UnresolvedAttribute.class, ua -> { + if (ua.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) { + return new UnresolvedAttribute(ua.source(), tsAttributeName.get()); + } + return ua; + })); + } + + for (int i = 0; i < timeSeriesAggregate.aggregates().size(); i++) { + newAggregates.add( + (NamedExpression) timeSeriesAggregate.aggregates().get(i).transformUp(UnresolvedAttribute.class, ua -> { + if (ua.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) { + return new UnresolvedAttribute(ua.source(), tsAttributeName.get()); + } + return ua; + }) + ); + } + timeSeriesAggregate = timeSeriesAggregate.with(newGroupings, newAggregates, tsAttribute.get()); + } else { + timeSeriesAggregate = timeSeriesAggregate.with(timeSeriesAggregate.groupings(), timeSeriesAggregate.aggregates(), tsAtter); + } + + // After correcting the timestamps, we still need to resolve the node as normal, so delegate to resolveAggregate + return (TimeSeriesAggregate) resolveAggregate(timeSeriesAggregate, childrenOutput); + } + private Aggregate resolveAggregate(Aggregate aggregate, List childrenOutput) { // if the grouping is resolved but the aggs are not, use the former to resolve the latter // e.g. STATS a ... GROUP BY a = x + 1 @@ -1083,7 +1145,7 @@ private Attribute resolveAttribute(UnresolvedAttribute ua, List child private static Attribute resolveAttribute(UnresolvedAttribute ua, List childrenOutput, Logger logger) { Attribute resolved = ua; - var named = resolveAgainstList(ua, childrenOutput); + List named = resolveAgainstList(ua, childrenOutput); // if resolved, return it; otherwise keep it in place to be resolved later if (named.size() == 1) { resolved = named.get(0); @@ -1253,9 +1315,9 @@ public static List projectionsForRename(Rename rename, List x.name().equals(alias.name())); childrenOutput.removeIf(x -> x.name().equals(alias.name())); - var resolved = maybeResolveAttribute(ua, childrenOutput, logger); + Attribute resolved = maybeResolveAttribute(ua, childrenOutput, logger); if (resolved instanceof UnsupportedAttribute || resolved.resolved()) { - var realiased = (NamedExpression) alias.replaceChildren(List.of(resolved)); + NamedExpression realiased = alias.replaceChildren(List.of(resolved)); projections.replaceAll(x -> x.equals(resolved) ? realiased : x); childrenOutput.removeIf(x -> x.equals(resolved)); reverseAliasing.put(resolved.name(), alias.name()); @@ -1356,7 +1418,7 @@ private static List resolveAgainstList(UnresolvedNamePattern up, Coll } private static List resolveAgainstList(UnresolvedAttribute ua, Collection attrList) { - var matches = AnalyzerRules.maybeResolveAgainstList(ua, attrList, a -> Analyzer.handleSpecialFields(ua, a)); + List matches = AnalyzerRules.maybeResolveAgainstList(ua, attrList, a -> Analyzer.handleSpecialFields(ua, a)); return potentialCandidatesIfNoMatchesFound(ua, matches, attrList, list -> UnresolvedAttribute.errorMessage(ua.name(), list)); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RuleUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RuleUtils.java index 67e51fa524cc9..60c81ef0515c7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RuleUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/RuleUtils.java @@ -87,7 +87,7 @@ public static AttributeMap foldableReferences(LogicalPlan plan, Logi // collect aliases bottom-up plan.forEachExpressionUp(Alias.class, a -> { - var c = a.child(); + Expression c = a.child(); boolean shouldCollect = c.foldable(); // try to resolve the expression based on an existing foldables if (shouldCollect == false) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java index d18bc94791691..2930783f20475 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java @@ -165,15 +165,12 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) { if (attr.name().equals(MetadataAttribute.TSID_FIELD)) { tsid.set(attr); } - if (attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) { - timestamp.set(attr); - } } }); if (tsid.get() == null) { tsid.set(new MetadataAttribute(aggregate.source(), MetadataAttribute.TSID_FIELD, DataType.KEYWORD, false)); } - if (timestamp.get() == null) { + if (aggregate.timestamp() == null) { throw new IllegalArgumentException("_tsid or @timestamp field are missing from the time-series source"); } Map timeSeriesAggs = new HashMap<>(); @@ -216,7 +213,7 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) { } else { // TODO: reject over_time_aggregation only final Expression aggField = af.field(); - var tsAgg = new LastOverTime(af.source(), aggField, timestamp.get()); + var tsAgg = new LastOverTime(af.source(), aggField, aggregate.timestamp()); final AggregateFunction firstStageFn; if (inlineFilter != null) { firstStageFn = tsAgg.perTimeSeriesAggregation().withFilter(inlineFilter); @@ -247,12 +244,12 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) { Holder timeBucketRef = new Holder<>(); aggregate.child().forEachExpressionUp(NamedExpression.class, e -> { for (Expression child : e.children()) { - if (child instanceof Bucket bucket && bucket.field().equals(timestamp.get())) { + if (child instanceof Bucket bucket && bucket.field().equals(aggregate.timestamp())) { if (timeBucketRef.get() != null) { throw new IllegalArgumentException("expected at most one time bucket"); } timeBucketRef.set(e); - } else if (child instanceof TBucket tbucket && tbucket.field().equals(timestamp.get())) { + } else if (child instanceof TBucket tbucket && tbucket.field().equals(aggregate.timestamp())) { if (timeBucketRef.get() != null) { throw new IllegalArgumentException("expected at most one time tbucket"); } @@ -296,7 +293,8 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) { newChild, firstPassGroupings, mergeExpressions(firstPassAggs, firstPassGroupings), - (Bucket) Alias.unwrap(timeBucket) + (Bucket) Alias.unwrap(timeBucket), + aggregate.timestamp() ); return new Aggregate(firstPhase.source(), firstPhase, secondPassGroupings, mergeExpressions(secondPassAggs, secondPassGroupings)); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index dbd18446e748d..0d9be667d8295 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -362,7 +362,7 @@ public PlanFactory visitStatsCommand(EsqlBaseParser.StatsCommandContext ctx) { return input -> { if (input.anyMatch(p -> p instanceof Aggregate) == false && input.anyMatch(p -> p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES)) { - return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates, null); + return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates, null, null); } else { return new Aggregate(source(ctx), input, stats.groupings, stats.aggregates); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/QueryPlan.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/QueryPlan.java index 81a89950b0a02..1cf779b3e4479 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/QueryPlan.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/QueryPlan.java @@ -203,6 +203,15 @@ public void forEachExpressionDown(Class type forEachPropertyDown(Object.class, e -> doForEachExpression(e, exp -> exp.forEachDown(typeToken, rule))); } + /** + * This traverses all {@link Expression} nodes for all children of the current node, applying the given function to each of them. + * It does not transform (i.e. replace) those nodes, it just hands them to the consumer, which can read but not modify (since the + * nodes are immutable) + * + * @param typeToken Only process expressions matching the given type + * @param rule a non-modifying consumer which operates on the given token type + * @param the type of expression this pass will process + */ public void forEachExpressionUp(Class typeToken, Consumer rule) { forEachPropertyUp(Object.class, e -> doForEachExpression(e, exp -> exp.forEachUp(typeToken, rule))); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java index 2fd9fb77764d1..fe55a82feef37 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java @@ -12,6 +12,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.xpack.esql.common.Failures; import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; @@ -41,21 +42,30 @@ public class TimeSeriesAggregate extends Aggregate { ); private final Bucket timeBucket; + /** + * this field is used by {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateTimeSeriesAggregate} to help with + * resolving the timestamp field, but should not be needed after the initial logical planning on the coordinating node. As such, + * it is not serialized. + */ + private final Attribute timestamp; public TimeSeriesAggregate( Source source, LogicalPlan child, List groupings, List aggregates, - Bucket timeBucket + Bucket timeBucket, + Attribute timestamp ) { super(source, child, groupings, aggregates); this.timeBucket = timeBucket; + this.timestamp = timestamp; } public TimeSeriesAggregate(StreamInput in) throws IOException { super(in); this.timeBucket = in.readOptionalWriteable(inp -> (Bucket) Bucket.ENTRY.reader.read(inp)); + this.timestamp = null; } @Override @@ -71,17 +81,21 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket); + return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket, timestamp); } @Override public TimeSeriesAggregate replaceChild(LogicalPlan newChild) { - return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket); + return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket, timestamp); } @Override public TimeSeriesAggregate with(LogicalPlan child, List newGroupings, List newAggregates) { - return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket); + return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket, timestamp); + } + + public TimeSeriesAggregate with(List newGroupings, List newAggregates, Attribute newTimestamp) { + return new TimeSeriesAggregate(source(), child(), newGroupings, newAggregates, timeBucket, newTimestamp); } @Override @@ -94,6 +108,10 @@ public Bucket timeBucket() { return timeBucket; } + public Attribute timestamp() { + return timestamp; + } + @Override public int hashCode() { return Objects.hash(groupings, aggregates, child(), timeBucket); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregateTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregateTests.java new file mode 100644 index 0000000000000..5be0f811f22ca --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregateTests.java @@ -0,0 +1,245 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.logical; + +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.action.EsqlCapabilities; +import org.elasticsearch.xpack.esql.analysis.Analyzer; +import org.elasticsearch.xpack.esql.analysis.AnalyzerContext; +import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Max; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate; +import org.elasticsearch.xpack.esql.index.EsIndex; +import org.elasticsearch.xpack.esql.index.IndexResolution; +import org.elasticsearch.xpack.esql.optimizer.AbstractLogicalPlanOptimizerTests; +import org.elasticsearch.xpack.esql.plan.logical.Aggregate; +import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.Limit; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.Project; +import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate; +import org.elasticsearch.xpack.esql.plan.logical.TopN; +import org.junit.BeforeClass; + +import java.util.Map; + +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution; + +public class TranslateTimeSeriesAggregateTests extends AbstractLogicalPlanOptimizerTests { + + private static Map mappingK8s; + private static Analyzer k8sAnalyzer; + + @BeforeClass + public static void initK8s() { + // Load Time Series mappings for these tests + mappingK8s = loadMapping("k8s-mappings.json"); + EsIndex k8sIndex = new EsIndex("k8s", mappingK8s, Map.of("k8s", IndexMode.TIME_SERIES)); + IndexResolution getIndexResult = IndexResolution.valid(k8sIndex); + k8sAnalyzer = new Analyzer( + new AnalyzerContext( + EsqlTestUtils.TEST_CFG, + new EsqlFunctionRegistry(), + getIndexResult, + defaultLookupResolution(), + enrichResolution, + emptyInferenceResolution() + ), + TEST_VERIFIER + ); + } + + protected LogicalPlan planK8s(String query) { + LogicalPlan analyzed = k8sAnalyzer.analyze(parser.createStatement(query, EsqlTestUtils.TEST_CFG)); + LogicalPlan optimized = logicalOptimizer.optimize(analyzed); + return optimized; + } + + /** + * Test that {@link TranslateTimeSeriesAggregate} correctly splits up a two stage aggregation with a time bucket. + * + * Expected plan: + *
{@code
+     *Limit[10[INTEGER],false]
+     * \_Aggregate[[time_bucket{r}#4],[COUNT(MAXOVERTIME_$1{r}#33,true[BOOLEAN]) AS count(max_over_time(network.cost))#6, time_bucket{r}#4]]
+     *   \_TimeSeriesAggregate[[_tsid{m}#34, time_bucket{r}#4],[MAX(network.cost{f}#24,true[BOOLEAN]) AS MAXOVERTIME_$1#33,
+     *                         time_bucket{r}#4 AS time_bucket#4],BUCKET(@timestamp{f}#8,PT1M[TIME_DURATION])]
+     *     \_Eval[[BUCKET(@timestamp{f}#8,PT1M[TIME_DURATION]) AS time_bucket#4]]
+     *       \_EsRelation[k8s][@timestamp{f}#8, client.ip{f}#12, cluster{f}#9, eve..]
+     * }
+ */ + public void testMaxOverTime() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled()); + LogicalPlan plan = planK8s(""" + TS k8s + | STATS count(max_over_time(network.cost)) BY time_bucket = BUCKET(@timestamp, 1 minute) + | LIMIT 10 + """); + Limit limit = as(plan, Limit.class); + Aggregate outerStats = as(limit.child(), Aggregate.class); + TimeSeriesAggregate innerStats = as(outerStats.child(), TimeSeriesAggregate.class); + // TODO: Add asserts about the specific aggregation details here + Eval eval = as(innerStats.child(), Eval.class); + EsRelation relation = as(eval.child(), EsRelation.class); + } + + public void testMaxOfRate() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled()); + LogicalPlan plan = planK8s(""" + TS k8s + | STATS max(rate(network.total_bytes_in)) BY time_bucket = BUCKET(@timestamp, 1 minute) + | LIMIT 10 + """); + Limit limit = as(plan, Limit.class); + + Aggregate outerStats = as(limit.child(), Aggregate.class); + assertEquals(1, outerStats.groupings().size()); + Attribute timeBucketGroup = as(outerStats.groupings().get(0), Attribute.class); + assertEquals("time_bucket", timeBucketGroup.name()); + assertEquals(2, outerStats.aggregates().size()); + assertEquals(timeBucketGroup, outerStats.aggregates().get(1)); + Alias outerAggFunction = as(outerStats.aggregates().get(0), Alias.class); + Max outerMax = as(outerAggFunction.child(), Max.class); + + TimeSeriesAggregate innerStats = as(outerStats.child(), TimeSeriesAggregate.class); + assertEquals(2, innerStats.groupings().size()); + assertEquals(timeBucketGroup, innerStats.groupings().get(1)); + Attribute tsidGroup = as(innerStats.groupings().get(0), Attribute.class); + assertEquals("_tsid", tsidGroup.name()); + + assertEquals(2, innerStats.aggregates().size()); + Alias innerAggFunction = as(innerStats.aggregates().get(0), Alias.class); + Rate rateAgg = as(innerAggFunction.child(), Rate.class); + Alias timeBucketAlias = as(innerStats.aggregates().get(1), Alias.class); + assertEquals(timeBucketGroup, timeBucketAlias.child()); + + Eval eval = as(innerStats.child(), Eval.class); + EsRelation relation = as(eval.child(), EsRelation.class); + } + + /** + *
{@code
+     * Project[[avg_cost{r}#8, cluster{f}#14, time_bucket{r}#5]]
+     * \_TopN[[Order[avg_cost{r}#8,DESC,FIRST], Order[time_bucket{r}#5,DESC,FIRST], Order[cluster{f}#14,ASC,LAST]],10[INTEGER]]
+     *   \_Eval[[$$SUM$avg_cost$0{r$}#38 / $$COUNT$avg_cost$1{r$}#39 AS avg_cost#8]]
+     *     \_Aggregate[[cluster{r}#14, time_bucket{r}#5],[SUM(AVGOVERTIME_$1{r}#41,true[BOOLEAN],compensated[KEYWORD])
+     *                  AS $$SUM$avg_cost$0#38, COUNT(AVGOVERTIME_$1{r}#41,true[BOOLEAN]) AS $$COUNT$avg_cost$1#39,
+     *                  cluster{r}#14 AS cluster#14, time_bucket{r}#5 AS time_bucket#5]]
+     *       \_Eval[[$$SUM$AVGOVERTIME_$1$0{r$}#42 / $$COUNT$AVGOVERTIME_$1$1{r$}#43 AS AVGOVERTIME_$1#41]]
+     *         \_TimeSeriesAggregate[[_tsid{m}#40, time_bucket{r}#5],[SUM(network.cost{f}#29,true[BOOLEAN],lossy[KEYWORD])
+     *                                AS $$SUM$AVGOVERTIME_$1$0#42, COUNT(network.cost{f}#29,true[BOOLEAN]) AS $$COUNT$AVGOVERTIME_$1$1#43,
+     *                                VALUES(cluster{f}#14,true[BOOLEAN]) AS cluster#14, time_bucket{r}#5],
+     *                                BUCKET(@timestamp{f}#13,PT1M[TIME_DURATION])]
+     *           \_Eval[[BUCKET(@timestamp{f}#13,PT1M[TIME_DURATION]) AS time_bucket#5]]
+     *             \_EsRelation[k8s][@timestamp{f}#13, client.ip{f}#17, cluster{f}#14, e..]
+     * }
+ */ + public void testAvgOfAvgOverTime() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled()); + LogicalPlan plan = planK8s(""" + TS k8s + | STATS avg_cost=avg(avg_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) + | SORT avg_cost DESC, time_bucket DESC, cluster + | LIMIT 10 + """); + Project project = as(plan, Project.class); + TopN topN = as(project.child(), TopN.class); + Eval outerEval = as(topN.child(), Eval.class); // This is the surrogate average calculation for the outer average + Aggregate outerStats = as(outerEval.child(), Aggregate.class); + Eval innerEval = as(outerStats.child(), Eval.class); // Surrogate for the inner average + TimeSeriesAggregate innerStats = as(innerEval.child(), TimeSeriesAggregate.class); + Eval bucketEval = as(innerStats.child(), Eval.class); // compute the tbucket + EsRelation relation = as(bucketEval.child(), EsRelation.class); + } + + public void testRateWithRename() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled()); + LogicalPlan plan = planK8s(""" + TS k8s + | RENAME `@timestamp` AS newTs + | STATS maxRate = max(rate(network.total_cost)) BY time_bucket = bucket(newTs, 1hour) + """); + Limit limit = as(plan, Limit.class); + + Aggregate outerStats = as(limit.child(), Aggregate.class); + assertEquals(1, outerStats.groupings().size()); + Attribute timeBucketGroup = as(outerStats.groupings().get(0), Attribute.class); + assertEquals("time_bucket", timeBucketGroup.name()); + assertEquals(2, outerStats.aggregates().size()); + assertEquals(timeBucketGroup, outerStats.aggregates().get(1)); + Alias outerAggFunction = as(outerStats.aggregates().get(0), Alias.class); + Max outerMax = as(outerAggFunction.child(), Max.class); + + TimeSeriesAggregate innerStats = as(outerStats.child(), TimeSeriesAggregate.class); + assertEquals(2, innerStats.groupings().size()); + Attribute tsidGroup = as(innerStats.groupings().get(0), Attribute.class); + assertEquals("_tsid", tsidGroup.name()); + assertEquals(timeBucketGroup, innerStats.groupings().get(1)); + + assertEquals(2, innerStats.aggregates().size()); + Alias innerAggFunction = as(innerStats.aggregates().get(0), Alias.class); + Rate rateAgg = as(innerAggFunction.child(), Rate.class); + Alias timeBucketAlias = as(innerStats.aggregates().get(1), Alias.class); + assertEquals(timeBucketGroup, timeBucketAlias.child()); + + Eval eval = as(innerStats.child(), Eval.class); + EsRelation relation = as(eval.child(), EsRelation.class); + } + + public void testRateWithManyRenames() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled()); + LogicalPlan plan = planK8s(""" + TS k8s + | RENAME `@timestamp` AS ts1 + | RENAME ts1 AS ts2 + | RENAME ts2 AS ts3 + | RENAME ts3 AS ts4 + | RENAME ts4 as newTs + | STATS maxRate = max(rate(network.total_cost)) BY tbucket = bucket(newTs, 1hour) + """); + } + + public void testOverTimeFunctionWithRename() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled()); + LogicalPlan plan = planK8s(""" + TS k8s + | RENAME `@timestamp` AS newTs + | STATS maxRate = max(max_over_time(network.eth0.tx)) BY tbucket = bucket(newTs, 1hour) + """); + } + + public void testTbucketWithRename() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled()); + LogicalPlan plan = planK8s(""" + TS k8s + | RENAME `@timestamp` AS newTs + | STATS maxRate = max(max_over_time(network.eth0.tx)) BY tbucket = tbucket(1hour) + """); + } + + public void testTbucketWithManyRenames() { + assumeTrue("requires metrics command", EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled()); + LogicalPlan plan = planK8s(""" + TS k8s + | RENAME `@timestamp` AS ts1 + | RENAME ts1 AS ts2 + | RENAME ts2 AS ts3 + | RENAME ts3 AS ts4 + | STATS maxRate = max(max_over_time(network.eth0.tx)) BY tbucket = tbucket(1hour) + """); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java index bc8f9bda86fbb..54901e795783b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java @@ -2496,6 +2496,7 @@ public void testSimpleMetricsWithStats() { new Alias(EMPTY, "load", new UnresolvedFunction(EMPTY, "avg", DEFAULT, List.of(attribute("cpu")))), attribute("ts") ), + null, null ) ); @@ -2509,6 +2510,7 @@ public void testSimpleMetricsWithStats() { new Alias(EMPTY, "load", new UnresolvedFunction(EMPTY, "avg", DEFAULT, List.of(attribute("cpu")))), attribute("ts") ), + null, null ) ); @@ -2532,6 +2534,7 @@ public void testSimpleMetricsWithStats() { ), attribute("ts") ), + null, null ) ); @@ -2542,6 +2545,7 @@ public void testSimpleMetricsWithStats() { unresolvedTSRelation("foo*"), List.of(), List.of(new Alias(EMPTY, "count(errors)", new UnresolvedFunction(EMPTY, "count", DEFAULT, List.of(attribute("errors"))))), + null, null ) ); @@ -2552,6 +2556,7 @@ public void testSimpleMetricsWithStats() { unresolvedTSRelation("foo*"), List.of(), List.of(new Alias(EMPTY, "a(b)", new UnresolvedFunction(EMPTY, "a", DEFAULT, List.of(attribute("b"))))), + null, null ) ); @@ -2562,6 +2567,7 @@ public void testSimpleMetricsWithStats() { unresolvedTSRelation("foo*"), List.of(), List.of(new Alias(EMPTY, "a(b)", new UnresolvedFunction(EMPTY, "a", DEFAULT, List.of(attribute("b"))))), + null, null ) ); @@ -2572,6 +2578,7 @@ public void testSimpleMetricsWithStats() { unresolvedTSRelation("foo*"), List.of(), List.of(new Alias(EMPTY, "a1(b2)", new UnresolvedFunction(EMPTY, "a1", DEFAULT, List.of(attribute("b2"))))), + null, null ) ); @@ -2586,6 +2593,7 @@ public void testSimpleMetricsWithStats() { attribute("c"), attribute("d.e") ), + null, null ) ); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregateSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregateSerializationTests.java index 4d04bc557afba..cd218a3bf8d0f 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregateSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregateSerializationTests.java @@ -24,7 +24,7 @@ protected TimeSeriesAggregate createTestInstance() { List groupings = randomFieldAttributes(0, 5, false).stream().map(a -> (Expression) a).toList(); List aggregates = AggregateSerializationTests.randomAggregates(); Bucket timeBucket = BucketSerializationTests.createRandomBucket(); - return new TimeSeriesAggregate(source, child, groupings, aggregates, timeBucket); + return new TimeSeriesAggregate(source, child, groupings, aggregates, timeBucket, null); } @Override @@ -42,7 +42,7 @@ protected TimeSeriesAggregate mutateInstance(TimeSeriesAggregate instance) throw case 2 -> aggregates = randomValueOtherThan(aggregates, AggregateSerializationTests::randomAggregates); case 3 -> timeBucket = randomValueOtherThan(timeBucket, BucketSerializationTests::createRandomBucket); } - return new TimeSeriesAggregate(instance.source(), child, groupings, aggregates, timeBucket); + return new TimeSeriesAggregate(instance.source(), child, groupings, aggregates, timeBucket, null); } @Override