diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java index 767d255e1f06a..736e88aa3cf82 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.operator; +import org.elasticsearch.common.Rounding; import org.elasticsearch.compute.Describable; import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.compute.aggregation.GroupingAggregator; @@ -23,6 +24,7 @@ public class TimeSeriesAggregationOperator extends HashAggregationOperator { public record Factory( + Rounding.Prepared timeBucket, List groups, AggregatorMode aggregatorMode, List aggregators, @@ -32,6 +34,7 @@ public record Factory( public Operator get(DriverContext driverContext) { // TODO: use TimeSeriesBlockHash when possible return new TimeSeriesAggregationOperator( + timeBucket, aggregators, () -> BlockHash.build( groups, @@ -53,11 +56,15 @@ public String describe() { } } + private final Rounding.Prepared timeBucket; + public TimeSeriesAggregationOperator( + Rounding.Prepared timeBucket, List aggregators, Supplier blockHash, DriverContext driverContext ) { super(aggregators, blockHash, driverContext); + this.timeBucket = timeBucket; } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java index 7e58968bebb21..e6b30a4797d45 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.operator; +import org.elasticsearch.common.Rounding; import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.compute.aggregation.GroupingAggregator; @@ -46,6 +47,7 @@ public record SupplierWithChannels(AggregatorFunctionSupplier supplier, List groupings, List rates, List nonRates, @@ -62,6 +64,7 @@ public Operator get(DriverContext driverContext) { } aggregators.addAll(valuesAggregatorForGroupings(groupings, timeBucketChannel)); return new TimeSeriesAggregationOperator( + timeBucket, aggregators, () -> new TimeSeriesBlockHash(tsHashChannel, timeBucketChannel, driverContext.blockFactory()), driverContext @@ -77,6 +80,7 @@ public String describe() { public record Intermediate( int tsHashChannel, int timeBucketChannel, + Rounding.Prepared timeBucket, List groupings, List rates, List nonRates, @@ -97,6 +101,7 @@ public Operator get(DriverContext driverContext) { new BlockHash.GroupSpec(timeBucketChannel, ElementType.LONG) ); return new TimeSeriesAggregationOperator( + timeBucket, aggregators, () -> BlockHash.build(hashGroups, driverContext.blockFactory(), maxPageSize, true), driverContext diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java index 6956e9ba59d90..c1ed7ae510172 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java @@ -270,6 +270,7 @@ public void close() { Operator intialAgg = new TimeSeriesAggregationOperatorFactories.Initial( 1, 3, + rounding, IntStream.range(0, nonBucketGroupings.size()).mapToObj(n -> new BlockHash.GroupSpec(5 + n, ElementType.BYTES_REF)).toList(), List.of(new SupplierWithChannels(new RateLongAggregatorFunctionSupplier(unitInMillis), List.of(4, 2))), List.of(), @@ -280,6 +281,7 @@ public void close() { Operator intermediateAgg = new TimeSeriesAggregationOperatorFactories.Intermediate( 0, 1, + rounding, IntStream.range(0, nonBucketGroupings.size()).mapToObj(n -> new BlockHash.GroupSpec(5 + n, ElementType.BYTES_REF)).toList(), List.of(new SupplierWithChannels(new RateLongAggregatorFunctionSupplier(unitInMillis), List.of(2, 3, 4))), List.of(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java index 1af5bf8a20578..8bc6dd532cd15 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java @@ -262,16 +262,7 @@ public boolean foldable() { @Override public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) { if (field.dataType() == DataType.DATETIME || field.dataType() == DataType.DATE_NANOS) { - Rounding.Prepared preparedRounding; - if (buckets.dataType().isWholeNumber()) { - int b = ((Number) buckets.fold(toEvaluator.foldCtx())).intValue(); - long f = foldToLong(toEvaluator.foldCtx(), from); - long t = foldToLong(toEvaluator.foldCtx(), to); - preparedRounding = new DateRoundingPicker(b, f, t).pickRounding().prepareForUnknown(); - } else { - assert DataType.isTemporalAmount(buckets.dataType()) : "Unexpected span data type [" + buckets.dataType() + "]"; - preparedRounding = DateTrunc.createRounding(buckets.fold(toEvaluator.foldCtx()), DEFAULT_TZ); - } + Rounding.Prepared preparedRounding = getDateRounding(toEvaluator.foldCtx()); return DateTrunc.evaluator(field.dataType(), source(), toEvaluator.apply(field), preparedRounding); } if (field.dataType().isNumeric()) { @@ -295,6 +286,30 @@ public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) { throw EsqlIllegalArgumentException.illegalDataType(field.dataType()); } + /** + * Returns the date rounding from this bucket function if the target field is a date type; otherwise, returns null. + */ + public Rounding.Prepared getDateRoundingOrNull(FoldContext foldCtx) { + if (field.dataType() == DataType.DATETIME || field.dataType() == DataType.DATE_NANOS) { + return getDateRounding(foldCtx); + } else { + return null; + } + } + + private Rounding.Prepared getDateRounding(FoldContext foldContext) { + assert field.dataType() == DataType.DATETIME || field.dataType() == DataType.DATE_NANOS : "expected date type; got " + field; + if (buckets.dataType().isWholeNumber()) { + int b = ((Number) buckets.fold(foldContext)).intValue(); + long f = foldToLong(foldContext, from); + long t = foldToLong(foldContext, to); + return new DateRoundingPicker(b, f, t).pickRounding().prepareForUnknown(); + } else { + assert DataType.isTemporalAmount(buckets.dataType()) : "Unexpected span data type [" + buckets.dataType() + "]"; + return DateTrunc.createRounding(buckets.fold(foldContext), DEFAULT_TZ); + } + } + private record DateRoundingPicker(int buckets, long from, long to) { Rounding pickRounding() { Rounding prev = LARGEST_HUMAN_DATE_ROUNDING; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java index 69ab34534de1d..2f925470eb41d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java @@ -123,7 +123,7 @@ public TranslateTimeSeriesAggregate() { @Override protected LogicalPlan rule(Aggregate aggregate) { - if (aggregate instanceof TimeSeriesAggregate ts) { + if (aggregate instanceof TimeSeriesAggregate ts && ts.timeBucket() == null) { return translate(ts); } else { return aggregate; @@ -226,7 +226,8 @@ LogicalPlan translate(TimeSeriesAggregate aggregate) { newChild.source(), newChild, firstPassGroupings, - mergeExpressions(firstPassAggs, firstPassGroupings) + mergeExpressions(firstPassAggs, firstPassGroupings), + (Bucket) Alias.unwrap(timeBucket) ); return new Aggregate(firstPhase.source(), firstPhase, secondPassGroupings, mergeExpressions(secondPassAggs, secondPassGroupings)); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index db1cffb80d1e4..c54073004c365 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -315,7 +315,7 @@ public PlanFactory visitStatsCommand(EsqlBaseParser.StatsCommandContext ctx) { final Stats stats = stats(source(ctx), ctx.grouping, ctx.stats); return input -> { if (input.anyMatch(p -> p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES)) { - return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates); + return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates, null); } else { return new Aggregate(source(ctx), input, stats.groupings, stats.aggregates); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java index cc1c1cd2cf7f3..3a145dcd0110c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java @@ -9,10 +9,12 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket; import java.io.IOException; import java.util.List; @@ -28,17 +30,28 @@ public class TimeSeriesAggregate extends Aggregate { TimeSeriesAggregate::new ); - public TimeSeriesAggregate(Source source, LogicalPlan child, List groupings, List aggregates) { + private final Bucket timeBucket; + + public TimeSeriesAggregate( + Source source, + LogicalPlan child, + List groupings, + List aggregates, + Bucket timeBucket + ) { super(source, child, groupings, aggregates); + this.timeBucket = timeBucket; } public TimeSeriesAggregate(StreamInput in) throws IOException { super(in); + this.timeBucket = in.readOptionalWriteable(inp -> (Bucket) Bucket.ENTRY.reader.read(inp)); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeOptionalWriteable(timeBucket); } @Override @@ -48,16 +61,21 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates); + return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket); } @Override public TimeSeriesAggregate replaceChild(LogicalPlan newChild) { - return new TimeSeriesAggregate(source(), newChild, groupings, aggregates); + return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket); } @Override public TimeSeriesAggregate with(LogicalPlan child, List newGroupings, List newAggregates) { - return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates); + return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket); + } + + @Nullable + public Bucket timeBucket() { + return timeBucket; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesAggregateExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesAggregateExec.java index 69d385181b6e2..e1c220b5f623b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesAggregateExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/TimeSeriesAggregateExec.java @@ -7,15 +7,19 @@ package org.elasticsearch.xpack.esql.plan.physical; +import org.elasticsearch.common.Rounding; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.aggregation.AggregatorMode; +import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import java.io.IOException; @@ -32,6 +36,8 @@ public class TimeSeriesAggregateExec extends AggregateExec { TimeSeriesAggregateExec::new ); + private final Bucket timeBucket; + public TimeSeriesAggregateExec( Source source, PhysicalPlan child, @@ -39,18 +45,22 @@ public TimeSeriesAggregateExec( List aggregates, AggregatorMode mode, List intermediateAttributes, - Integer estimatedRowSize + Integer estimatedRowSize, + Bucket timeBucket ) { super(source, child, groupings, aggregates, mode, intermediateAttributes, estimatedRowSize); + this.timeBucket = timeBucket; } private TimeSeriesAggregateExec(StreamInput in) throws IOException { super(in); + this.timeBucket = in.readOptionalWriteable(inp -> (Bucket) Bucket.ENTRY.reader.read(inp)); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeOptionalWriteable(timeBucket); } @Override @@ -68,7 +78,8 @@ protected NodeInfo info() { aggregates(), getMode(), intermediateAttributes(), - estimatedRowSize() + estimatedRowSize(), + timeBucket ); } @@ -81,7 +92,8 @@ public TimeSeriesAggregateExec replaceChild(PhysicalPlan newChild) { aggregates(), getMode(), intermediateAttributes(), - estimatedRowSize() + estimatedRowSize(), + timeBucket ); } @@ -93,7 +105,8 @@ public TimeSeriesAggregateExec withMode(AggregatorMode newMode) { aggregates(), newMode, intermediateAttributes(), - estimatedRowSize() + estimatedRowSize(), + timeBucket ); } @@ -106,7 +119,23 @@ protected AggregateExec withEstimatedSize(int estimatedRowSize) { aggregates(), getMode(), intermediateAttributes(), - estimatedRowSize + estimatedRowSize, + timeBucket ); } + + public Bucket timeBucket() { + return timeBucket; + } + + public Rounding.Prepared timeBucketRounding(FoldContext foldContext) { + if (timeBucket == null) { + return null; + } + Rounding.Prepared rounding = timeBucket.getDateRoundingOrNull(foldContext); + if (rounding == null) { + throw new EsqlIllegalArgumentException("expected TBUCKET; got ", timeBucket); + } + return rounding; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java index b8922f11e3db9..92e3a7132b010 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java @@ -174,8 +174,9 @@ else if (aggregatorMode.isOutputPartial()) { s -> aggregatorFactories.add(s.supplier.groupingAggregatorFactory(s.mode, s.channels)) ); // time-series aggregation - if (aggregateExec instanceof TimeSeriesAggregateExec) { + if (aggregateExec instanceof TimeSeriesAggregateExec ts) { operatorFactory = new TimeSeriesAggregationOperator.Factory( + ts.timeBucketRounding(context.foldCtx()), groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(), aggregatorMode, aggregatorFactories, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java index ca1ee8af95925..473ca4f92c8a8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java @@ -131,7 +131,7 @@ static List intermediateAttributes(Aggregate aggregate) { } static AggregateExec aggExec(Aggregate aggregate, PhysicalPlan child, AggregatorMode aggMode, List intermediateAttributes) { - if (aggregate instanceof TimeSeriesAggregate) { + if (aggregate instanceof TimeSeriesAggregate ts) { return new TimeSeriesAggregateExec( aggregate.source(), child, @@ -139,7 +139,8 @@ static AggregateExec aggExec(Aggregate aggregate, PhysicalPlan child, Aggregator aggregate.aggregates(), aggMode, intermediateAttributes, - null + null, + ts.timeBucket() ); } else { return new AggregateExec( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index 0cc252e2133f3..4a5c3381351e9 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -132,6 +132,7 @@ import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; import org.junit.BeforeClass; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -6718,6 +6719,7 @@ public void testTranslateMetricsWithoutGrouping() { Aggregate finalAggs = as(limit.child(), Aggregate.class); assertThat(finalAggs, not(instanceOf(TimeSeriesAggregate.class))); TimeSeriesAggregate aggsByTsid = as(finalAggs.child(), TimeSeriesAggregate.class); + assertNull(aggsByTsid.timeBucket()); as(aggsByTsid.child(), EsRelation.class); assertThat(finalAggs.aggregates(), hasSize(1)); @@ -6738,6 +6740,7 @@ public void testTranslateMixedAggsWithoutGrouping() { Aggregate finalAggs = as(limit.child(), Aggregate.class); assertThat(finalAggs, not(instanceOf(TimeSeriesAggregate.class))); TimeSeriesAggregate aggsByTsid = as(finalAggs.child(), TimeSeriesAggregate.class); + assertNull(aggsByTsid.timeBucket()); as(aggsByTsid.child(), EsRelation.class); assertThat(finalAggs.aggregates(), hasSize(2)); @@ -6768,6 +6771,7 @@ public void testTranslateMixedAggsWithMathWithoutGrouping() { assertThat(finalAggs.aggregates(), hasSize(2)); TimeSeriesAggregate aggsByTsid = as(finalAggs.child(), TimeSeriesAggregate.class); assertThat(aggsByTsid.aggregates(), hasSize(2)); + assertNull(aggsByTsid.timeBucket()); Eval addEval = as(aggsByTsid.child(), Eval.class); assertThat(addEval.fields(), hasSize(1)); Add add = as(Alias.unwrap(addEval.fields().get(0)), Add.class); @@ -6800,6 +6804,7 @@ public void testTranslateMetricsGroupedByOneDimension() { assertThat(aggsByCluster.aggregates(), hasSize(2)); TimeSeriesAggregate aggsByTsid = as(aggsByCluster.child(), TimeSeriesAggregate.class); assertThat(aggsByTsid.aggregates(), hasSize(2)); // _tsid is dropped + assertNull(aggsByTsid.timeBucket()); as(aggsByTsid.child(), EsRelation.class); Sum sum = as(Alias.unwrap(aggsByCluster.aggregates().get(0)), Sum.class); @@ -6826,6 +6831,7 @@ public void testTranslateMetricsGroupedByTwoDimension() { assertThat(finalAggs.aggregates(), hasSize(4)); TimeSeriesAggregate aggsByTsid = as(finalAggs.child(), TimeSeriesAggregate.class); assertThat(aggsByTsid.aggregates(), hasSize(3)); // _tsid is dropped + assertNull(aggsByTsid.timeBucket()); as(aggsByTsid.child(), EsRelation.class); Div div = as(Alias.unwrap(eval.fields().get(0)), Div.class); @@ -6861,6 +6867,8 @@ public void testTranslateMetricsGroupedByTimeBucket() { assertThat(finalAgg.aggregates(), hasSize(2)); TimeSeriesAggregate aggsByTsid = as(finalAgg.child(), TimeSeriesAggregate.class); assertThat(aggsByTsid.aggregates(), hasSize(2)); // _tsid is dropped + assertNotNull(aggsByTsid.timeBucket()); + assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofHours(1))); Eval eval = as(aggsByTsid.child(), Eval.class); assertThat(eval.fields(), hasSize(1)); as(eval.child(), EsRelation.class); @@ -6894,6 +6902,8 @@ public void testTranslateMetricsGroupedByTimeBucketAndDimensions() { Aggregate finalAgg = as(eval.child(), Aggregate.class); assertThat(finalAgg, not(instanceOf(TimeSeriesAggregate.class))); TimeSeriesAggregate aggsByTsid = as(finalAgg.child(), TimeSeriesAggregate.class); + assertNotNull(aggsByTsid.timeBucket()); + assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(5))); Eval bucket = as(aggsByTsid.child(), Eval.class); as(bucket.child(), EsRelation.class); assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id())); @@ -6933,6 +6943,8 @@ public void testTranslateMixedAggsGroupedByTimeBucketAndDimensions() { Aggregate finalAgg = as(eval.child(), Aggregate.class); assertThat(finalAgg, not(instanceOf(TimeSeriesAggregate.class))); TimeSeriesAggregate aggsByTsid = as(finalAgg.child(), TimeSeriesAggregate.class); + assertNotNull(aggsByTsid.timeBucket()); + assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(5))); Eval bucket = as(aggsByTsid.child(), Eval.class); as(bucket.child(), EsRelation.class); assertThat(Expressions.attribute(div.left()).id(), equalTo(finalAgg.aggregates().get(0).id())); @@ -6992,6 +7004,8 @@ public void testAdjustMetricsRateBeforeFinalAgg() { TimeSeriesAggregate aggsByTsid = as(evalRound.child(), TimeSeriesAggregate.class); assertThat(aggsByTsid.aggregates(), hasSize(3)); // rate, cluster, bucket assertThat(aggsByTsid.groupings(), hasSize(2)); + assertNotNull(aggsByTsid.timeBucket()); + assertThat(aggsByTsid.timeBucket().buckets().fold(FoldContext.small()), equalTo(Duration.ofMinutes(1))); Eval evalBucket = as(aggsByTsid.child(), Eval.class); assertThat(evalBucket.fields(), hasSize(1)); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java index 2ece86d0a4631..515564052e79b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java @@ -2332,7 +2332,11 @@ public void testSimpleMetricsWithStats() { EMPTY, unresolvedTSRelation("foo"), List.of(attribute("ts")), - List.of(new Alias(EMPTY, "load", new UnresolvedFunction(EMPTY, "avg", DEFAULT, List.of(attribute("cpu")))), attribute("ts")) + List.of( + new Alias(EMPTY, "load", new UnresolvedFunction(EMPTY, "avg", DEFAULT, List.of(attribute("cpu")))), + attribute("ts") + ), + null ) ); assertStatement( @@ -2341,7 +2345,11 @@ public void testSimpleMetricsWithStats() { EMPTY, unresolvedTSRelation("foo,bar"), List.of(attribute("ts")), - List.of(new Alias(EMPTY, "load", new UnresolvedFunction(EMPTY, "avg", DEFAULT, List.of(attribute("cpu")))), attribute("ts")) + List.of( + new Alias(EMPTY, "load", new UnresolvedFunction(EMPTY, "avg", DEFAULT, List.of(attribute("cpu")))), + attribute("ts") + ), + null ) ); assertStatement( @@ -2363,7 +2371,8 @@ public void testSimpleMetricsWithStats() { ) ), attribute("ts") - ) + ), + null ) ); assertStatement( @@ -2372,7 +2381,8 @@ public void testSimpleMetricsWithStats() { EMPTY, unresolvedTSRelation("foo*"), List.of(), - List.of(new Alias(EMPTY, "count(errors)", new UnresolvedFunction(EMPTY, "count", DEFAULT, List.of(attribute("errors"))))) + List.of(new Alias(EMPTY, "count(errors)", new UnresolvedFunction(EMPTY, "count", DEFAULT, List.of(attribute("errors"))))), + null ) ); assertStatement( @@ -2381,7 +2391,8 @@ public void testSimpleMetricsWithStats() { EMPTY, unresolvedTSRelation("foo*"), List.of(), - List.of(new Alias(EMPTY, "a(b)", new UnresolvedFunction(EMPTY, "a", DEFAULT, List.of(attribute("b"))))) + List.of(new Alias(EMPTY, "a(b)", new UnresolvedFunction(EMPTY, "a", DEFAULT, List.of(attribute("b"))))), + null ) ); assertStatement( @@ -2390,7 +2401,8 @@ public void testSimpleMetricsWithStats() { EMPTY, unresolvedTSRelation("foo*"), List.of(), - List.of(new Alias(EMPTY, "a(b)", new UnresolvedFunction(EMPTY, "a", DEFAULT, List.of(attribute("b"))))) + List.of(new Alias(EMPTY, "a(b)", new UnresolvedFunction(EMPTY, "a", DEFAULT, List.of(attribute("b"))))), + null ) ); assertStatement( @@ -2399,7 +2411,8 @@ public void testSimpleMetricsWithStats() { EMPTY, unresolvedTSRelation("foo*"), List.of(), - List.of(new Alias(EMPTY, "a1(b2)", new UnresolvedFunction(EMPTY, "a1", DEFAULT, List.of(attribute("b2"))))) + List.of(new Alias(EMPTY, "a1(b2)", new UnresolvedFunction(EMPTY, "a1", DEFAULT, List.of(attribute("b2"))))), + null ) ); assertStatement( @@ -2412,7 +2425,8 @@ public void testSimpleMetricsWithStats() { new Alias(EMPTY, "b", new UnresolvedFunction(EMPTY, "min", DEFAULT, List.of(attribute("a")))), attribute("c"), attribute("d.e") - ) + ), + null ) ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/AggregateSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/AggregateSerializationTests.java index 0370b1f30267e..055b094dd8a95 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/AggregateSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/AggregateSerializationTests.java @@ -35,7 +35,7 @@ protected Aggregate createTestInstance() { if (randomBoolean()) { return new Aggregate(source, child, groupings, aggregates); } else { - return new TimeSeriesAggregate(source, child, groupings, aggregates); + return new TimeSeriesAggregate(source, child, groupings, aggregates, null); } } @@ -76,7 +76,7 @@ protected Aggregate mutateInstance(Aggregate instance) throws IOException { case 2 -> aggregates = randomValueOtherThan(aggregates, AggregateSerializationTests::randomAggregates); } if (instance instanceof TimeSeriesAggregate) { - return new TimeSeriesAggregate(instance.source(), child, groupings, aggregates); + return new TimeSeriesAggregate(instance.source(), child, groupings, aggregates, null); } else { return new Aggregate(instance.source(), child, groupings, aggregates); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AggregateExecSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AggregateExecSerializationTests.java index 8acb51c10beca..caf516014df14 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AggregateExecSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AggregateExecSerializationTests.java @@ -31,7 +31,7 @@ public static AggregateExec randomAggregateExec(int depth) { if (randomBoolean()) { return new AggregateExec(source, child, groupings, aggregates, mode, intermediateAttributes, estimatedRowSize); } else { - return new TimeSeriesAggregateExec(source, child, groupings, aggregates, mode, intermediateAttributes, estimatedRowSize); + return new TimeSeriesAggregateExec(source, child, groupings, aggregates, mode, intermediateAttributes, estimatedRowSize, null); } } @@ -68,7 +68,8 @@ protected AggregateExec mutateInstance(AggregateExec instance) throws IOExceptio aggregates, mode, intermediateAttributes, - estimatedRowSize + estimatedRowSize, + null ); } else { return new AggregateExec(instance.source(), child, groupings, aggregates, mode, intermediateAttributes, estimatedRowSize);