Skip to content

Commit e47cfd0

Browse files
dnhatnomricohenn
authored andcommitted
Add time-series aggregation operator (elastic#125537)
We will need custom logic in the time-series aggregation operator, such as smoothing the rate across buckets. To address this, this PR introduces a TimeSeriesAggregationOperator that extends HashAggregationOperator to support the addition logic.
1 parent a9e447d commit e47cfd0

File tree

6 files changed

+116
-64
lines changed

6 files changed

+116
-64
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java

Lines changed: 24 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -32,63 +32,40 @@
3232
import java.util.Arrays;
3333
import java.util.List;
3434
import java.util.Objects;
35-
import java.util.function.Function;
3635
import java.util.function.Supplier;
3736

3837
import static java.util.Objects.requireNonNull;
3938
import static java.util.stream.Collectors.joining;
4039

4140
public class HashAggregationOperator implements Operator {
4241

43-
public static final class HashAggregationOperatorFactory implements OperatorFactory {
44-
final Function<DriverContext, BlockHash> blockHashSupplier;
45-
final AggregatorMode aggregatorMode;
46-
final List<GroupingAggregator.Factory> aggregators;
47-
final int maxPageSize;
48-
final AnalysisRegistry analysisRegistry;
49-
50-
public HashAggregationOperatorFactory(
51-
Function<DriverContext, BlockHash> blockHashSupplier,
52-
AggregatorMode aggregatorMode,
53-
List<GroupingAggregator.Factory> aggregators,
54-
int maxPageSize,
55-
AnalysisRegistry analysisRegistry
56-
) {
57-
this.blockHashSupplier = blockHashSupplier;
58-
this.aggregatorMode = aggregatorMode;
59-
this.aggregators = aggregators;
60-
this.maxPageSize = maxPageSize;
61-
this.analysisRegistry = analysisRegistry;
62-
63-
}
64-
65-
public HashAggregationOperatorFactory(
66-
List<BlockHash.GroupSpec> groups,
67-
AggregatorMode aggregatorMode,
68-
List<GroupingAggregator.Factory> aggregators,
69-
int maxPageSize,
70-
AnalysisRegistry analysisRegistry
71-
) {
42+
public record HashAggregationOperatorFactory(
43+
List<BlockHash.GroupSpec> groups,
44+
AggregatorMode aggregatorMode,
45+
List<GroupingAggregator.Factory> aggregators,
46+
int maxPageSize,
47+
AnalysisRegistry analysisRegistry
48+
) implements OperatorFactory {
49+
@Override
50+
public Operator get(DriverContext driverContext) {
7251
if (groups.stream().anyMatch(BlockHash.GroupSpec::isCategorize)) {
73-
this.blockHashSupplier = driverContext -> BlockHash.buildCategorizeBlockHash(
74-
groups,
75-
aggregatorMode,
76-
driverContext.blockFactory(),
77-
analysisRegistry,
78-
maxPageSize
52+
return new HashAggregationOperator(
53+
aggregators,
54+
() -> BlockHash.buildCategorizeBlockHash(
55+
groups,
56+
aggregatorMode,
57+
driverContext.blockFactory(),
58+
analysisRegistry,
59+
maxPageSize
60+
),
61+
driverContext
7962
);
80-
} else {
81-
this.blockHashSupplier = driverContext -> BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false);
8263
}
83-
this.aggregatorMode = aggregatorMode;
84-
this.aggregators = aggregators;
85-
this.maxPageSize = maxPageSize;
86-
this.analysisRegistry = analysisRegistry;
87-
}
88-
89-
@Override
90-
public Operator get(DriverContext driverContext) {
91-
return new HashAggregationOperator(aggregators, () -> blockHashSupplier.apply(driverContext), driverContext);
64+
return new HashAggregationOperator(
65+
aggregators,
66+
() -> BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false),
67+
driverContext
68+
);
9269
}
9370

9471
@Override
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.elasticsearch.compute.Describable;
11+
import org.elasticsearch.compute.aggregation.AggregatorMode;
12+
import org.elasticsearch.compute.aggregation.GroupingAggregator;
13+
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
14+
import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;
15+
16+
import java.util.List;
17+
import java.util.function.Supplier;
18+
19+
import static java.util.stream.Collectors.joining;
20+
21+
/**
22+
* A specialized version of {@link HashAggregationOperator} that aggregates time-series aggregations from time-series sources.
23+
*/
24+
public class TimeSeriesAggregationOperator extends HashAggregationOperator {
25+
26+
public record Factory(
27+
BlockHash.GroupSpec tsidGroup,
28+
BlockHash.GroupSpec timestampGroup,
29+
AggregatorMode aggregatorMode,
30+
List<GroupingAggregator.Factory> aggregators,
31+
int maxPageSize
32+
) implements OperatorFactory {
33+
@Override
34+
public Operator get(DriverContext driverContext) {
35+
return new HashAggregationOperator(aggregators, () -> {
36+
if (aggregatorMode.isInputPartial()) {
37+
return BlockHash.build(
38+
List.of(tsidGroup, timestampGroup),
39+
driverContext.blockFactory(),
40+
maxPageSize,
41+
true // we can enable optimizations as the inputs are vectors
42+
);
43+
} else {
44+
return new TimeSeriesBlockHash(timestampGroup.channel(), timestampGroup.channel(), driverContext.blockFactory());
45+
}
46+
}, driverContext);
47+
}
48+
49+
@Override
50+
public String describe() {
51+
return "MetricsAggregationOperator[mode = "
52+
+ "<not-needed>"
53+
+ ", aggs = "
54+
+ aggregators.stream().map(Describable::describe).collect(joining(", "))
55+
+ "]";
56+
}
57+
}
58+
59+
public TimeSeriesAggregationOperator(
60+
List<GroupingAggregator.Factory> aggregators,
61+
Supplier<BlockHash> blockHash,
62+
DriverContext driverContext
63+
) {
64+
super(aggregators, blockHash, driverContext);
65+
}
66+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public Operator get(DriverContext driverContext) {
6161
aggregators.add(f.supplier.groupingAggregatorFactory(AggregatorMode.INITIAL, f.channels));
6262
}
6363
aggregators.addAll(valuesAggregatorForGroupings(groupings, timeBucketChannel));
64-
return new HashAggregationOperator(
64+
return new TimeSeriesAggregationOperator(
6565
aggregators,
6666
() -> new TimeSeriesBlockHash(tsHashChannel, timeBucketChannel, driverContext.blockFactory()),
6767
driverContext
@@ -96,9 +96,9 @@ public Operator get(DriverContext driverContext) {
9696
new BlockHash.GroupSpec(tsHashChannel, ElementType.BYTES_REF),
9797
new BlockHash.GroupSpec(timeBucketChannel, ElementType.LONG)
9898
);
99-
return new HashAggregationOperator(
99+
return new TimeSeriesAggregationOperator(
100100
aggregators,
101-
() -> BlockHash.build(hashGroups, driverContext.blockFactory(), maxPageSize, false),
101+
() -> BlockHash.build(hashGroups, driverContext.blockFactory(), maxPageSize, true),
102102
driverContext
103103
);
104104
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.elasticsearch.xpack.esql.expression.function.OptionalArgument;
3030
import org.elasticsearch.xpack.esql.expression.function.Param;
3131
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
32-
import org.elasticsearch.xpack.esql.planner.ToAggregator;
32+
import org.elasticsearch.xpack.esql.planner.ToTimeSeriesAggregator;
3333

3434
import java.io.IOException;
3535
import java.time.Duration;
@@ -40,7 +40,7 @@
4040
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
4141
import static org.elasticsearch.xpack.esql.core.util.CollectionUtils.nullSafeList;
4242

43-
public class Rate extends AggregateFunction implements OptionalArgument, ToAggregator {
43+
public class Rate extends AggregateFunction implements OptionalArgument, ToTimeSeriesAggregator {
4444
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Rate", Rate::new);
4545
private static final TimeValue DEFAULT_UNIT = TimeValue.timeValueSeconds(1);
4646

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,12 @@
1313
import org.elasticsearch.compute.aggregation.FilteredAggregatorFunctionSupplier;
1414
import org.elasticsearch.compute.aggregation.GroupingAggregator;
1515
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
16-
import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;
1716
import org.elasticsearch.compute.data.ElementType;
18-
import org.elasticsearch.compute.lucene.TimeSeriesSortedSourceOperatorFactory;
1917
import org.elasticsearch.compute.operator.AggregationOperator;
2018
import org.elasticsearch.compute.operator.EvalOperator;
2119
import org.elasticsearch.compute.operator.HashAggregationOperator.HashAggregationOperatorFactory;
2220
import org.elasticsearch.compute.operator.Operator;
21+
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
2322
import org.elasticsearch.index.analysis.AnalysisRegistry;
2423
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
2524
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
@@ -175,21 +174,16 @@ else if (aggregatorMode.isOutputPartial()) {
175174
s -> aggregatorFactories.add(s.supplier.groupingAggregatorFactory(s.mode, s.channels))
176175
);
177176
// time-series aggregation
178-
if (source.sourceOperatorFactory instanceof TimeSeriesSortedSourceOperatorFactory
179-
&& aggregatorMode.isInputPartial() == false
177+
if (Expressions.anyMatch(aggregates, a -> a instanceof ToTimeSeriesAggregator)
180178
&& groupSpecs.size() == 2
181179
&& groupSpecs.get(0).attribute.dataType() == DataType.TSID_DATA_TYPE
182180
&& groupSpecs.get(1).attribute.dataType() == DataType.LONG) {
183-
operatorFactory = new HashAggregationOperatorFactory(
184-
driverContext -> new TimeSeriesBlockHash(
185-
groupSpecs.get(0).channel,
186-
groupSpecs.get(1).channel,
187-
driverContext.blockFactory()
188-
),
181+
operatorFactory = new TimeSeriesAggregationOperator.Factory(
182+
groupSpecs.get(0).toHashGroupSpec(),
183+
groupSpecs.get(1).toHashGroupSpec(),
189184
aggregatorMode,
190185
aggregatorFactories,
191-
context.pageSize(aggregateExec.estimatedRowSize()),
192-
analysisRegistry
186+
context.pageSize(aggregateExec.estimatedRowSize())
193187
);
194188
// ordinal grouping
195189
} else if (groupSpecs.size() == 1 && groupSpecs.get(0).channel == null) {
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.planner;
9+
10+
/**
11+
* An interface indicates that this is a time-series aggregator and it requires time-series source
12+
*/
13+
public interface ToTimeSeriesAggregator extends ToAggregator {
14+
15+
}

0 commit comments

Comments
 (0)