diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java index db30c5e66f991..c47b6cebdaddc 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java @@ -32,7 +32,6 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; -import java.util.function.Function; import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -40,55 +39,33 @@ public class HashAggregationOperator implements Operator { - public static final class HashAggregationOperatorFactory implements OperatorFactory { - final Function blockHashSupplier; - final AggregatorMode aggregatorMode; - final List aggregators; - final int maxPageSize; - final AnalysisRegistry analysisRegistry; - - public HashAggregationOperatorFactory( - Function blockHashSupplier, - AggregatorMode aggregatorMode, - List aggregators, - int maxPageSize, - AnalysisRegistry analysisRegistry - ) { - this.blockHashSupplier = blockHashSupplier; - this.aggregatorMode = aggregatorMode; - this.aggregators = aggregators; - this.maxPageSize = maxPageSize; - this.analysisRegistry = analysisRegistry; - - } - - public HashAggregationOperatorFactory( - List groups, - AggregatorMode aggregatorMode, - List aggregators, - int maxPageSize, - AnalysisRegistry analysisRegistry - ) { + public record HashAggregationOperatorFactory( + List groups, + AggregatorMode aggregatorMode, + List aggregators, + int maxPageSize, + AnalysisRegistry analysisRegistry + ) implements OperatorFactory { + @Override + public Operator get(DriverContext driverContext) { if (groups.stream().anyMatch(BlockHash.GroupSpec::isCategorize)) { - this.blockHashSupplier = driverContext -> BlockHash.buildCategorizeBlockHash( - groups, - aggregatorMode, - driverContext.blockFactory(), - analysisRegistry, - maxPageSize + return new HashAggregationOperator( + aggregators, + () -> BlockHash.buildCategorizeBlockHash( + groups, + aggregatorMode, + driverContext.blockFactory(), + analysisRegistry, + maxPageSize + ), + driverContext ); - } else { - this.blockHashSupplier = driverContext -> BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false); } - this.aggregatorMode = aggregatorMode; - this.aggregators = aggregators; - this.maxPageSize = maxPageSize; - this.analysisRegistry = analysisRegistry; - } - - @Override - public Operator get(DriverContext driverContext) { - return new HashAggregationOperator(aggregators, () -> blockHashSupplier.apply(driverContext), driverContext); + return new HashAggregationOperator( + aggregators, + () -> BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false), + driverContext + ); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java new file mode 100644 index 0000000000000..69e9e0fd69486 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.compute.Describable; +import org.elasticsearch.compute.aggregation.AggregatorMode; +import org.elasticsearch.compute.aggregation.GroupingAggregator; +import org.elasticsearch.compute.aggregation.blockhash.BlockHash; +import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash; + +import java.util.List; +import java.util.function.Supplier; + +import static java.util.stream.Collectors.joining; + +/** + * A specialized version of {@link HashAggregationOperator} that aggregates time-series aggregations from time-series sources. + */ +public class TimeSeriesAggregationOperator extends HashAggregationOperator { + + public record Factory( + BlockHash.GroupSpec tsidGroup, + BlockHash.GroupSpec timestampGroup, + AggregatorMode aggregatorMode, + List aggregators, + int maxPageSize + ) implements OperatorFactory { + @Override + public Operator get(DriverContext driverContext) { + return new HashAggregationOperator(aggregators, () -> { + if (aggregatorMode.isInputPartial()) { + return BlockHash.build( + List.of(tsidGroup, timestampGroup), + driverContext.blockFactory(), + maxPageSize, + true // we can enable optimizations as the inputs are vectors + ); + } else { + return new TimeSeriesBlockHash(timestampGroup.channel(), timestampGroup.channel(), driverContext.blockFactory()); + } + }, driverContext); + } + + @Override + public String describe() { + return "MetricsAggregationOperator[mode = " + + "" + + ", aggs = " + + aggregators.stream().map(Describable::describe).collect(joining(", ")) + + "]"; + } + } + + public TimeSeriesAggregationOperator( + List aggregators, + Supplier blockHash, + DriverContext driverContext + ) { + super(aggregators, blockHash, driverContext); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java index c6602be15041b..7e58968bebb21 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java @@ -61,7 +61,7 @@ public Operator get(DriverContext driverContext) { aggregators.add(f.supplier.groupingAggregatorFactory(AggregatorMode.INITIAL, f.channels)); } aggregators.addAll(valuesAggregatorForGroupings(groupings, timeBucketChannel)); - return new HashAggregationOperator( + return new TimeSeriesAggregationOperator( aggregators, () -> new TimeSeriesBlockHash(tsHashChannel, timeBucketChannel, driverContext.blockFactory()), driverContext @@ -96,9 +96,9 @@ public Operator get(DriverContext driverContext) { new BlockHash.GroupSpec(tsHashChannel, ElementType.BYTES_REF), new BlockHash.GroupSpec(timeBucketChannel, ElementType.LONG) ); - return new HashAggregationOperator( + return new TimeSeriesAggregationOperator( aggregators, - () -> BlockHash.build(hashGroups, driverContext.blockFactory(), maxPageSize, false), + () -> BlockHash.build(hashGroups, driverContext.blockFactory(), maxPageSize, true), driverContext ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java index ae385da4c86e3..7334b1133e53b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java @@ -29,7 +29,7 @@ import org.elasticsearch.xpack.esql.expression.function.OptionalArgument; import org.elasticsearch.xpack.esql.expression.function.Param; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; -import org.elasticsearch.xpack.esql.planner.ToAggregator; +import org.elasticsearch.xpack.esql.planner.ToTimeSeriesAggregator; import java.io.IOException; import java.time.Duration; @@ -40,7 +40,7 @@ import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType; import static org.elasticsearch.xpack.esql.core.util.CollectionUtils.nullSafeList; -public class Rate extends AggregateFunction implements OptionalArgument, ToAggregator { +public class Rate extends AggregateFunction implements OptionalArgument, ToTimeSeriesAggregator { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Rate", Rate::new); private static final TimeValue DEFAULT_UNIT = TimeValue.timeValueSeconds(1); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java index 4a57418d99ed7..2cf05dbd4efb7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java @@ -13,13 +13,12 @@ import org.elasticsearch.compute.aggregation.FilteredAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.GroupingAggregator; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; -import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash; import org.elasticsearch.compute.data.ElementType; -import org.elasticsearch.compute.lucene.TimeSeriesSortedSourceOperatorFactory; import org.elasticsearch.compute.operator.AggregationOperator; import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.compute.operator.HashAggregationOperator.HashAggregationOperatorFactory; import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.InvalidArgumentException; @@ -175,21 +174,16 @@ else if (aggregatorMode.isOutputPartial()) { s -> aggregatorFactories.add(s.supplier.groupingAggregatorFactory(s.mode, s.channels)) ); // time-series aggregation - if (source.sourceOperatorFactory instanceof TimeSeriesSortedSourceOperatorFactory - && aggregatorMode.isInputPartial() == false + if (Expressions.anyMatch(aggregates, a -> a instanceof ToTimeSeriesAggregator) && groupSpecs.size() == 2 && groupSpecs.get(0).attribute.dataType() == DataType.TSID_DATA_TYPE && groupSpecs.get(1).attribute.dataType() == DataType.LONG) { - operatorFactory = new HashAggregationOperatorFactory( - driverContext -> new TimeSeriesBlockHash( - groupSpecs.get(0).channel, - groupSpecs.get(1).channel, - driverContext.blockFactory() - ), + operatorFactory = new TimeSeriesAggregationOperator.Factory( + groupSpecs.get(0).toHashGroupSpec(), + groupSpecs.get(1).toHashGroupSpec(), aggregatorMode, aggregatorFactories, - context.pageSize(aggregateExec.estimatedRowSize()), - analysisRegistry + context.pageSize(aggregateExec.estimatedRowSize()) ); // ordinal grouping } else if (groupSpecs.size() == 1 && groupSpecs.get(0).channel == null) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/ToTimeSeriesAggregator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/ToTimeSeriesAggregator.java new file mode 100644 index 0000000000000..869bad8a0f111 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/ToTimeSeriesAggregator.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.planner; + +/** + * An interface indicates that this is a time-series aggregator and it requires time-series source + */ +public interface ToTimeSeriesAggregator extends ToAggregator { + +}