-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add time-series aggregation operator #125537
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.compute.operator; | ||
|
|
||
| import org.elasticsearch.compute.Describable; | ||
| import org.elasticsearch.compute.aggregation.AggregatorMode; | ||
| import org.elasticsearch.compute.aggregation.GroupingAggregator; | ||
| import org.elasticsearch.compute.aggregation.blockhash.BlockHash; | ||
| import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash; | ||
|
|
||
| import java.util.List; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import static java.util.stream.Collectors.joining; | ||
|
|
||
| /** | ||
| * A specialized version of {@link HashAggregationOperator} that aggregates time-series aggregations from time-series sources. | ||
| */ | ||
| public class TimeSeriesAggregationOperator extends HashAggregationOperator { | ||
|
|
||
| public record Factory( | ||
| BlockHash.GroupSpec tsidGroup, | ||
| BlockHash.GroupSpec timestampGroup, | ||
| AggregatorMode aggregatorMode, | ||
| List<GroupingAggregator.Factory> aggregators, | ||
| int maxPageSize | ||
| ) implements OperatorFactory { | ||
| @Override | ||
| public Operator get(DriverContext driverContext) { | ||
| return new HashAggregationOperator(aggregators, () -> { | ||
| if (aggregatorMode.isInputPartial()) { | ||
| return BlockHash.build( | ||
| List.of(tsidGroup, timestampGroup), | ||
| driverContext.blockFactory(), | ||
| maxPageSize, | ||
| true // we can enable optimizations as the inputs are vectors | ||
| ); | ||
| } else { | ||
| return new TimeSeriesBlockHash(timestampGroup.channel(), timestampGroup.channel(), driverContext.blockFactory()); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't the first arg be |
||
| } | ||
| }, driverContext); | ||
| } | ||
|
|
||
| @Override | ||
| public String describe() { | ||
| return "MetricsAggregationOperator[mode = " | ||
| + "<not-needed>" | ||
| + ", aggs = " | ||
| + aggregators.stream().map(Describable::describe).collect(joining(", ")) | ||
| + "]"; | ||
| } | ||
| } | ||
|
|
||
| public TimeSeriesAggregationOperator( | ||
| List<GroupingAggregator.Factory> aggregators, | ||
| Supplier<BlockHash> blockHash, | ||
| DriverContext driverContext | ||
| ) { | ||
| super(aggregators, blockHash, driverContext); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.esql.planner; | ||
|
|
||
| /** | ||
| * An interface indicates that this is a time-series aggregator and it requires time-series source | ||
| */ | ||
| public interface ToTimeSeriesAggregator extends ToAggregator { | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the smoothing going to be added to implementations of this interface or to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this interface serves as a placeholder for detecting time-series aggregations that require _tsid and @timestamp, such as rate (last, ...). |
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double check, currently the input is not partial during the initial grouping by tsid on data node, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct.