Skip to content

Commit 376fa8d

Browse files
authored
ESQL: Fix filtered grouping on ords (#115312) (#115455)
This fixes filtered aggs when they are grouped on a field with ordinals. This looks like: ``` | STATS max = max(salary) WHERE salary > 0 BY job_positions ``` when the `job_positions` field is a keyword field with doc values. In that case we use a faster group-by-segment-ordinals algorithm that needs to be able to merge the results of aggregators from multiple segments. This previously failed with a `ClassCastException` because of a mistake. Also! the group-by-segment-ordinals algorithm wasn't properly releasing the closure used to add inputs, causing a breaker size leak. This wasn't really leaking memory, but leaking *tracking* of memory. Closes #114897
1 parent f39aba4 commit 376fa8d

File tree

8 files changed

+183
-7
lines changed

8 files changed

+183
-7
lines changed

docs/changelog/115312.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 115312
2+
summary: "ESQL: Fix filtered grouping on ords"
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 114897

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FilteredGroupingAggregatorFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void addIntermediateInput(int positionOffset, IntVector groupIdVector, Pa
9797

9898
@Override
9999
public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) {
100-
next.addIntermediateRowInput(groupId, input, position);
100+
next.addIntermediateRowInput(groupId, ((FilteredGroupingAggregatorFunction) input).next(), position);
101101
}
102102

103103
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,8 +372,8 @@ static final class OrdinalSegmentAggregator implements Releasable, SeenGroupIds
372372
}
373373

374374
void addInput(IntVector docs, Page page) {
375+
GroupingAggregatorFunction.AddInput[] prepared = new GroupingAggregatorFunction.AddInput[aggregators.size()];
375376
try {
376-
GroupingAggregatorFunction.AddInput[] prepared = new GroupingAggregatorFunction.AddInput[aggregators.size()];
377377
for (int i = 0; i < prepared.length; i++) {
378378
prepared[i] = aggregators.get(i).prepareProcessPage(this, page);
379379
}
@@ -392,7 +392,7 @@ void addInput(IntVector docs, Page page) {
392392
} catch (IOException e) {
393393
throw new UncheckedIOException(e);
394394
} finally {
395-
page.releaseBlocks();
395+
Releasables.close(page::releaseBlocks, Releasables.wrap(prepared));
396396
}
397397
}
398398

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FilteredAggregatorFunctionTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
public class FilteredAggregatorFunctionTests extends AggregatorFunctionTestCase {
2828
private final List<Exception> unclosed = Collections.synchronizedList(new ArrayList<>());
2929

30-
// TODO some version of this test that applies across all aggs
3130
@Override
3231
protected AggregatorFunctionSupplier aggregatorFunction(List<Integer> inputChannels) {
3332
return new FilteredAggregatorFunctionSupplier(

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FilteredGroupingAggregatorFunctionTests.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
import org.elasticsearch.compute.data.BlockFactory;
1212
import org.elasticsearch.compute.data.BooleanVector;
1313
import org.elasticsearch.compute.data.IntBlock;
14+
import org.elasticsearch.compute.data.IntVector;
1415
import org.elasticsearch.compute.data.LongBlock;
1516
import org.elasticsearch.compute.data.Page;
1617
import org.elasticsearch.compute.operator.DriverContext;
1718
import org.elasticsearch.compute.operator.EvalOperator;
1819
import org.elasticsearch.compute.operator.LongIntBlockSourceOperator;
1920
import org.elasticsearch.compute.operator.SourceOperator;
21+
import org.elasticsearch.core.Releasables;
2022
import org.elasticsearch.core.Tuple;
2123
import org.junit.After;
2224

@@ -31,7 +33,6 @@
3133
public class FilteredGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase {
3234
private final List<Exception> unclosed = Collections.synchronizedList(new ArrayList<>());
3335

34-
// TODO some version of this test that applies across all aggs
3536
@Override
3637
protected AggregatorFunctionSupplier aggregatorFunction(List<Integer> inputChannels) {
3738
return new FilteredAggregatorFunctionSupplier(
@@ -104,6 +105,42 @@ protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
104105
);
105106
}
106107

108+
/**
109+
* Tests {@link GroupingAggregator#addIntermediateRow} by building results using the traditional
110+
* add mechanism and using {@link GroupingAggregator#addIntermediateRow} then asserting that they
111+
* produce the same output.
112+
*/
113+
public void testAddIntermediateRowInput() {
114+
DriverContext ctx = driverContext();
115+
AggregatorFunctionSupplier supplier = aggregatorFunction(channels(AggregatorMode.SINGLE));
116+
Block[] results = new Block[2];
117+
try (
118+
GroupingAggregatorFunction main = supplier.groupingAggregator(ctx);
119+
GroupingAggregatorFunction leaf = supplier.groupingAggregator(ctx);
120+
SourceOperator source = simpleInput(ctx.blockFactory(), 10);
121+
) {
122+
Page p;
123+
while ((p = source.getOutput()) != null) {
124+
try (
125+
IntVector group = ctx.blockFactory().newConstantIntVector(0, p.getPositionCount());
126+
GroupingAggregatorFunction.AddInput addInput = leaf.prepareProcessPage(null, p)
127+
) {
128+
addInput.add(0, group);
129+
} finally {
130+
p.releaseBlocks();
131+
}
132+
}
133+
main.addIntermediateRowInput(0, leaf, 0);
134+
try (IntVector selected = ctx.blockFactory().newConstantIntVector(0, 1)) {
135+
main.evaluateFinal(results, 0, selected, ctx);
136+
leaf.evaluateFinal(results, 1, selected, ctx);
137+
}
138+
assertThat(results[0], equalTo(results[1]));
139+
} finally {
140+
Releasables.close(results);
141+
}
142+
}
143+
107144
@After
108145
public void checkUnclosed() {
109146
for (Exception tracker : unclosed) {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,17 @@ protected final Operator.OperatorFactory simpleWithMode(AggregatorMode mode) {
8989
return simpleWithMode(mode, Function.identity());
9090
}
9191

92+
protected List<Integer> channels(AggregatorMode mode) {
93+
return mode.isInputPartial() ? range(1, 1 + aggregatorIntermediateBlockCount()).boxed().toList() : List.of(1);
94+
}
95+
9296
private Operator.OperatorFactory simpleWithMode(
9397
AggregatorMode mode,
9498
Function<AggregatorFunctionSupplier, AggregatorFunctionSupplier> wrap
9599
) {
96-
List<Integer> channels = mode.isInputPartial() ? range(1, 1 + aggregatorIntermediateBlockCount()).boxed().toList() : List.of(1);
97100
int emitChunkSize = between(100, 200);
98101

99-
AggregatorFunctionSupplier supplier = wrap.apply(aggregatorFunction(channels));
102+
AggregatorFunctionSupplier supplier = wrap.apply(aggregatorFunction(channels(mode)));
100103
if (randomBoolean()) {
101104
supplier = chunkGroups(emitChunkSize, supplier);
102105
}

x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2529,3 +2529,129 @@ FROM employees | eval x = [1,2,3], y = 5 + 6 | stats m = max(y) by y+1
25292529
m:integer | y+1:integer
25302530
11 | 12
25312531
;
2532+
2533+
filterIsAlwaysTrue
2534+
required_capability: per_agg_filtering
2535+
FROM employees
2536+
| STATS max = max(salary) WHERE salary > 0
2537+
;
2538+
2539+
max:integer
2540+
74999
2541+
;
2542+
2543+
filterIsAlwaysFalse
2544+
required_capability: per_agg_filtering
2545+
FROM employees
2546+
| STATS max = max(salary) WHERE first_name == ""
2547+
;
2548+
2549+
max:integer
2550+
null
2551+
;
2552+
2553+
filterSometimesMatches
2554+
required_capability: per_agg_filtering
2555+
FROM employees
2556+
| STATS max = max(salary) WHERE first_name IS NULL
2557+
;
2558+
2559+
max:integer
2560+
70011
2561+
;
2562+
2563+
groupingFilterIsAlwaysTrue
2564+
required_capability: per_agg_filtering
2565+
FROM employees
2566+
| MV_EXPAND job_positions
2567+
| STATS max = max(salary) WHERE salary > 0 BY job_positions = SUBSTRING(job_positions, 1, 1)
2568+
| SORT job_positions
2569+
| LIMIT 4
2570+
;
2571+
2572+
max:integer | job_positions:keyword
2573+
74970 | A
2574+
58121 | B
2575+
74999 | D
2576+
58715 | H
2577+
;
2578+
2579+
groupingFilterIsAlwaysFalse
2580+
required_capability: per_agg_filtering
2581+
FROM employees
2582+
| MV_EXPAND job_positions
2583+
| STATS max = max(salary) WHERE first_name == "" BY job_positions = SUBSTRING(job_positions, 1, 1)
2584+
| SORT job_positions
2585+
| LIMIT 4
2586+
;
2587+
2588+
max:integer | job_positions:keyword
2589+
null | A
2590+
null | B
2591+
null | D
2592+
null | H
2593+
;
2594+
2595+
groupingFilterSometimesMatches
2596+
required_capability: per_agg_filtering
2597+
FROM employees
2598+
| MV_EXPAND job_positions
2599+
| STATS max = max(salary) WHERE first_name IS NULL BY job_positions = SUBSTRING(job_positions, 1, 1)
2600+
| SORT job_positions
2601+
| LIMIT 4
2602+
;
2603+
2604+
max:integer | job_positions:keyword
2605+
62233 | A
2606+
39878 | B
2607+
67492 | D
2608+
null | H
2609+
;
2610+
2611+
groupingByOrdinalsFilterIsAlwaysTrue
2612+
required_capability: per_agg_filtering
2613+
required_capability: per_agg_filtering_ords
2614+
FROM employees
2615+
| STATS max = max(salary) WHERE salary > 0 BY job_positions
2616+
| SORT job_positions
2617+
| LIMIT 4
2618+
;
2619+
2620+
max:integer | job_positions:keyword
2621+
74970 | Accountant
2622+
69904 | Architect
2623+
58121 | Business Analyst
2624+
74999 | Data Scientist
2625+
;
2626+
2627+
groupingByOrdinalsFilterIsAlwaysFalse
2628+
required_capability: per_agg_filtering
2629+
required_capability: per_agg_filtering_ords
2630+
FROM employees
2631+
| STATS max = max(salary) WHERE first_name == "" BY job_positions
2632+
| SORT job_positions
2633+
| LIMIT 4
2634+
;
2635+
2636+
max:integer | job_positions:keyword
2637+
null | Accountant
2638+
null | Architect
2639+
null | Business Analyst
2640+
null | Data Scientist
2641+
;
2642+
2643+
groupingByOrdinalsFilterSometimesMatches
2644+
required_capability: per_agg_filtering
2645+
required_capability: per_agg_filtering_ords
2646+
FROM employees
2647+
| STATS max = max(salary) WHERE first_name IS NULL BY job_positions
2648+
| SORT job_positions
2649+
| LIMIT 4
2650+
;
2651+
2652+
max:integer | job_positions:keyword
2653+
39878 | Accountant
2654+
62233 | Architect
2655+
39878 | Business Analyst
2656+
67492 | Data Scientist
2657+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,11 @@ public enum Cap {
393393
*/
394394
PER_AGG_FILTERING,
395395

396+
/**
397+
* Fix {@link #PER_AGG_FILTERING} grouped by ordinals.
398+
*/
399+
PER_AGG_FILTERING_ORDS,
400+
396401
/**
397402
* Fix for https://github.com/elastic/elasticsearch/issues/114714
398403
*/

0 commit comments

Comments
 (0)