-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ESQL: Prevent overflow on SUM using multiple aggregators #116170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
# Conflicts: # x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/AbstractFallibleArrayState.java
# Conflicts: # x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunction.java
# Conflicts: # x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java # x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockSerializationTests.java # x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/TestCaseSupplier.java
# Conflicts: # x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java
|
Hi @ivancea, I've created a changelog YAML for you. |
| + "warningsLineNumber, warningsColumnNumber, warningsSourceText)" | ||
| "var warnings = $T.createWarnings(driverContext.warningsMode(), " | ||
| + "warningsLineNumber, warningsColumnNumber, warningsSourceText)", | ||
| WARNINGS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a bug in the PR implementing warnExceptions on aggs, which wasn't being used anywhere yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also go with SumV1LongAggregator or something like that
| DriverContext driverCtx = driverContext(); | ||
| Page page = new Page(blockFactory.newLongArrayVector(new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, 10).asBlock()); | ||
| var function = SumLongAggregatorFunction.create(driverCtx, List.of(0)); | ||
| var function = MaxLongAggregatorFunction.create(driverCtx, List.of(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the agg doesn't matter here, I changed it to another one. There are other similar fixes around tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the same inheritance pattern as with Scalar functions
| DataType type = field().dataType(); | ||
| if (type == DataType.LONG) { | ||
| // Old aggregator without overflow handling | ||
| if (configuration().clusterHasFeature(EsqlFeatures.FN_SUM_OVERFLOW_HANDLING) == false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the core of this PR: Choosing an aggregator based on a feature
| } | ||
|
|
||
| private static List<NamedExpression> computeEntryForAgg(Expression aggregate, boolean grouping) { | ||
| if (aggregate instanceof ToIntermediateState intermediateStateGetter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An interface like this looked interesting to make custom logics. It could be applied to other edge cases we have around AggregateMapper.
This doesn't remove the reflection-based logic we use here, it's just an easier-to-use override
| request.tables(), | ||
| System.nanoTime() | ||
| System.nanoTime(), | ||
| Configuration.calculateActiveClusterFeatures(featureService, clusterService) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where we inject the features into the Configuration object of the query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A clone of the AbstractConfigurationScalarFunctionTestCase
| * </p> | ||
| * <p> | ||
| * Can't be removed, as the new aggregator's layout is different. | ||
| * </p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be removed once we no longer need wire compatibility with versions before this PR.
| DriverContext driverCtx = driverContext(); | ||
| Page page = new Page(blockFactory.newLongArrayVector(new long[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, 10).asBlock()); | ||
| var function = SumLongAggregatorFunction.create(driverCtx, List.of(0)); | ||
| var function = MaxLongAggregatorFunction.create(driverCtx, List.of(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| required_capability: fn_sum_overflow_handling | ||
| FROM employees | ||
| | LIMIT 2 | ||
| | EVAL x = languages - languages + 9223372036854775807 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we multiply languages by a big number? That feels less likely to get optimized away one day.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uhm yeah, I'll remove that cheap trick
| public Sum(Source source, Expression field, Expression filter) { | ||
| super(source, field, filter, emptyList()); | ||
| public Sum(Source source, Expression field, Expression filter, Configuration configuration) { | ||
| super(source, field, filter, emptyList(), configuration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking you'd read the value from the configuration in the ctor rather than keep the configuration around. Heck - in this public ctor you could just pass the boolean and read if from the configuration in the one above. That way you could call this one in tests if you wanted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was planning to do that, but as scalar functions already have this configuration stored, I thought it was better to keep the same mechanism (?)
Edit: Your next comment digs deeper in the solution, so maybe ignore this comment for now
| this.activeEsqlFeatures = in.readCollectionAsImmutableSet(StreamInput::readString); | ||
| } else { | ||
| this.activeEsqlFeatures = Set.of(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could avoid serializing this. Like, maybe it doesn't belong in the configuration - maybe it's another parameter that we add to FunctionDefinition.Builder or something.
I feel like we'd be better off reading from the Set<NodeFeatures> that are active in the ctor of the Function and setting a boolean. That feels like it'd be easier to debug and test and the functions could have that boolean in their descriptions so even in profile we could see what it decided to do. I feel like there aren't going to be that many of these functions with behavior changes so the extra booleans here and there are ok. And the "you can see what it chose" behavior is nicer than serializing all of these strings.
Also! I kind of like the idea of choosing at function resolution time more than copying the list of features, saving it, and choosing at function translation time. It amounts to the same thing but it feels better in my heart.
|
Closing as we're not actively working on this, and there's a dependency on CCS features |
Fixes #110443
This PR does some things:
warnExceptionsforSumLongAggregator(This change can be partially seen at ESQL: Prevent overflow on SUM aggregation #111639)OverflowingSumLongAggregatoraggregator to be used depending on the featuresTo do
Cluster features don't work with CCS, so a different solution will be needed