-
Notifications
You must be signed in to change notification settings - Fork 25.4k
Use lossy summation for time-series aggregations #132625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
beb2b80
to
7c0d9f6
Compare
{
"operator": "TimeSeriesAggregationOperator[blockHash=BytesRefLongBlockHash{keys=[BytesRefKey[channel=3], LongKey[channel=2]], entries=546, size=56368b}, aggregators=[GroupingAggregator[aggregatorFunction=SumDoubleGroupingAggregatorFunction[channels=[4]], mode=INITIAL], GroupingAggregator[aggregatorFunction=CountGroupingAggregatorFunction[channels=[4]], mode=INITIAL], GroupingAggregator[aggregatorFunction=ValuesBytesRefGroupingAggregatorFunction[channels=[5]], mode=INITIAL]]]",
"status": {
"hash_nanos": 2949462,
"aggregation_nanos": 22679014, // <- 22ms
"pages_processed": 546,
"rows_received": 982982,
"rows_emitted": 546,
"emit_nanos": 121951
}
} {
"operator": "TimeSeriesAggregationOperator[blockHash=BytesRefLongBlockHash{keys=[BytesRefKey[channel=3], LongKey[channel=2]], entries=546, size=56368b}, aggregators=[GroupingAggregator[aggregatorFunction=LossySumDoubleGroupingAggregatorFunction[channels=[4]], mode=INITIAL], GroupingAggregator[aggregatorFunction=CountGroupingAggregatorFunction[channels=[4]], mode=INITIAL], GroupingAggregator[aggregatorFunction=ValuesBytesRefGroupingAggregatorFunction[channels=[5]], mode=INITIAL]]]",
"status": {
"hash_nanos": 2770991,
"aggregation_nanos": 15664657, // <- 15ms
"pages_processed": 546,
"rows_received": 982982,
"rows_emitted": 546,
"emit_nanos": 72400
}
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Nhat, this is a great speed up for the TS command!
I left a question, but other than that this looks good to me.
@@ -55,11 +58,16 @@ public Avg( | |||
description = "Expression that outputs values to average." | |||
) Expression field | |||
) { | |||
this(source, field, Literal.TRUE); | |||
this(source, field, Literal.TRUE, SummationMode.COMPENSATED_LITERAL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider also using LOSSY_LITERAL
for avg
the function is used in the context of TS
source command? If so, then maybe we can do that in a followup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++. I will do it in a follow-up, as these may require bigger changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense!
@@ -61,15 +66,25 @@ public class Sum extends NumericAggregate implements SurrogateExpression { | |||
) } | |||
) | |||
public Sum(Source source, @Param(name = "number", type = { "aggregate_metric_double", "double", "integer", "long" }) Expression field) { | |||
this(source, field, Literal.TRUE); | |||
this(source, field, Literal.TRUE, SummationMode.COMPENSATED_LITERAL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as for the avg function.
Pinging @elastic/es-analytical-engine (Team:Analytics) |
Pinging @elastic/es-storage-engine (Team:StorageEngine) |
|
||
public static void combineIntermediate(SumState state, double inValue, double zeroDelta, boolean seen) { | ||
assert zeroDelta == 0.0 : zeroDelta; | ||
if (seen) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be seen == false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, seen
indicates that the input value is valid.
...ql/compute/src/main/java/org/elasticsearch/compute/aggregation/LossySumDoubleAggregator.java
Outdated
Show resolved
Hide resolved
private static Avg readFrom(StreamInput in) throws IOException { | ||
Source source = Source.readFrom((PlanStreamInput) in); | ||
Expression field = in.readNamedWriteable(Expression.class); | ||
Expression filter = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use 8.16
here? Maybe add a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was copied from the super class. I took a new approach in 8ec481f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
Kahan summation can be expensive, and for time-series aggregation, a lossy summation can be a good trade-off for performance. This change introduces a lossy summation mode and makes it the default for time-series aggregations. These two summation modes for sum and avg are used internally and are not exposed to users.