Skip to content

Commit ee3ba11

Browse files
authored
ESQL: Fix filtered grouping on ords (#115312) (#115464)
This fixes filtered aggs when they are grouped on a field with ordinals. This looks like: ``` | STATS max = max(salary) WHERE salary > 0 BY job_positions ``` when the `job_positions` field is a keyword field with doc values. In that case we use a faster group-by-segment-ordinals algorithm that needs to be able to merge the results of aggregators from multiple segments. This previously failed with a `ClassCastException` because of a mistake. Also! the group-by-segment-ordinals algorithm wasn't properly releasing the closure used to add inputs, causing a breaker size leak. This wasn't really leaking memory, but leaking *tracking* of memory. Closes #114897
1 parent 211260f commit ee3ba11

File tree

8 files changed

+183
-7
lines changed

8 files changed

+183
-7
lines changed

docs/changelog/115312.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 115312
2+
summary: "ESQL: Fix filtered grouping on ords"
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 114897

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FilteredGroupingAggregatorFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void addIntermediateInput(int positionOffset, IntVector groupIdVector, Pa
9797

9898
@Override
9999
public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) {
100-
next.addIntermediateRowInput(groupId, input, position);
100+
next.addIntermediateRowInput(groupId, ((FilteredGroupingAggregatorFunction) input).next(), position);
101101
}
102102

103103
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,8 +372,8 @@ static final class OrdinalSegmentAggregator implements Releasable, SeenGroupIds
372372
}
373373

374374
void addInput(IntVector docs, Page page) {
375+
GroupingAggregatorFunction.AddInput[] prepared = new GroupingAggregatorFunction.AddInput[aggregators.size()];
375376
try {
376-
GroupingAggregatorFunction.AddInput[] prepared = new GroupingAggregatorFunction.AddInput[aggregators.size()];
377377
for (int i = 0; i < prepared.length; i++) {
378378
prepared[i] = aggregators.get(i).prepareProcessPage(this, page);
379379
}
@@ -392,7 +392,7 @@ void addInput(IntVector docs, Page page) {
392392
} catch (IOException e) {
393393
throw new UncheckedIOException(e);
394394
} finally {
395-
page.releaseBlocks();
395+
Releasables.close(page::releaseBlocks, Releasables.wrap(prepared));
396396
}
397397
}
398398

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FilteredAggregatorFunctionTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
public class FilteredAggregatorFunctionTests extends AggregatorFunctionTestCase {
2828
private final List<Exception> unclosed = Collections.synchronizedList(new ArrayList<>());
2929

30-
// TODO some version of this test that applies across all aggs
3130
@Override
3231
protected AggregatorFunctionSupplier aggregatorFunction(List<Integer> inputChannels) {
3332
return new FilteredAggregatorFunctionSupplier(

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FilteredGroupingAggregatorFunctionTests.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
import org.elasticsearch.compute.data.BlockFactory;
1212
import org.elasticsearch.compute.data.BooleanVector;
1313
import org.elasticsearch.compute.data.IntBlock;
14+
import org.elasticsearch.compute.data.IntVector;
1415
import org.elasticsearch.compute.data.LongBlock;
1516
import org.elasticsearch.compute.data.Page;
1617
import org.elasticsearch.compute.operator.DriverContext;
1718
import org.elasticsearch.compute.operator.EvalOperator;
1819
import org.elasticsearch.compute.operator.LongIntBlockSourceOperator;
1920
import org.elasticsearch.compute.operator.SourceOperator;
21+
import org.elasticsearch.core.Releasables;
2022
import org.elasticsearch.core.Tuple;
2123
import org.junit.After;
2224

@@ -31,7 +33,6 @@
3133
public class FilteredGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase {
3234
private final List<Exception> unclosed = Collections.synchronizedList(new ArrayList<>());
3335

34-
// TODO some version of this test that applies across all aggs
3536
@Override
3637
protected AggregatorFunctionSupplier aggregatorFunction(List<Integer> inputChannels) {
3738
return new FilteredAggregatorFunctionSupplier(
@@ -104,6 +105,42 @@ protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
104105
);
105106
}
106107

108+
/**
109+
* Tests {@link GroupingAggregator#addIntermediateRow} by building results using the traditional
110+
* add mechanism and using {@link GroupingAggregator#addIntermediateRow} then asserting that they
111+
* produce the same output.
112+
*/
113+
public void testAddIntermediateRowInput() {
114+
DriverContext ctx = driverContext();
115+
AggregatorFunctionSupplier supplier = aggregatorFunction(channels(AggregatorMode.SINGLE));
116+
Block[] results = new Block[2];
117+
try (
118+
GroupingAggregatorFunction main = supplier.groupingAggregator(ctx);
119+
GroupingAggregatorFunction leaf = supplier.groupingAggregator(ctx);
120+
SourceOperator source = simpleInput(ctx.blockFactory(), 10);
121+
) {
122+
Page p;
123+
while ((p = source.getOutput()) != null) {
124+
try (
125+
IntVector group = ctx.blockFactory().newConstantIntVector(0, p.getPositionCount());
126+
GroupingAggregatorFunction.AddInput addInput = leaf.prepareProcessPage(null, p)
127+
) {
128+
addInput.add(0, group);
129+
} finally {
130+
p.releaseBlocks();
131+
}
132+
}
133+
main.addIntermediateRowInput(0, leaf, 0);
134+
try (IntVector selected = ctx.blockFactory().newConstantIntVector(0, 1)) {
135+
main.evaluateFinal(results, 0, selected, ctx);
136+
leaf.evaluateFinal(results, 1, selected, ctx);
137+
}
138+
assertThat(results[0], equalTo(results[1]));
139+
} finally {
140+
Releasables.close(results);
141+
}
142+
}
143+
107144
@After
108145
public void checkUnclosed() {
109146
for (Exception tracker : unclosed) {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,17 @@ protected final Operator.OperatorFactory simpleWithMode(AggregatorMode mode) {
8989
return simpleWithMode(mode, Function.identity());
9090
}
9191

92+
protected List<Integer> channels(AggregatorMode mode) {
93+
return mode.isInputPartial() ? range(1, 1 + aggregatorIntermediateBlockCount()).boxed().toList() : List.of(1);
94+
}
95+
9296
private Operator.OperatorFactory simpleWithMode(
9397
AggregatorMode mode,
9498
Function<AggregatorFunctionSupplier, AggregatorFunctionSupplier> wrap
9599
) {
96-
List<Integer> channels = mode.isInputPartial() ? range(1, 1 + aggregatorIntermediateBlockCount()).boxed().toList() : List.of(1);
97100
int emitChunkSize = between(100, 200);
98101

99-
AggregatorFunctionSupplier supplier = wrap.apply(aggregatorFunction(channels));
102+
AggregatorFunctionSupplier supplier = wrap.apply(aggregatorFunction(channels(mode)));
100103
if (randomBoolean()) {
101104
supplier = chunkGroups(emitChunkSize, supplier);
102105
}

x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2500,3 +2500,129 @@ c2:l |c2_f:l |m2:i |m2_f:i |c:l
25002500
1 |0 |2 |2 |19
25012501
1 |1 |5 |5 |21
25022502
;
2503+
2504+
filterIsAlwaysTrue
2505+
required_capability: per_agg_filtering
2506+
FROM employees
2507+
| STATS max = max(salary) WHERE salary > 0
2508+
;
2509+
2510+
max:integer
2511+
74999
2512+
;
2513+
2514+
filterIsAlwaysFalse
2515+
required_capability: per_agg_filtering
2516+
FROM employees
2517+
| STATS max = max(salary) WHERE first_name == ""
2518+
;
2519+
2520+
max:integer
2521+
null
2522+
;
2523+
2524+
filterSometimesMatches
2525+
required_capability: per_agg_filtering
2526+
FROM employees
2527+
| STATS max = max(salary) WHERE first_name IS NULL
2528+
;
2529+
2530+
max:integer
2531+
70011
2532+
;
2533+
2534+
groupingFilterIsAlwaysTrue
2535+
required_capability: per_agg_filtering
2536+
FROM employees
2537+
| MV_EXPAND job_positions
2538+
| STATS max = max(salary) WHERE salary > 0 BY job_positions = SUBSTRING(job_positions, 1, 1)
2539+
| SORT job_positions
2540+
| LIMIT 4
2541+
;
2542+
2543+
max:integer | job_positions:keyword
2544+
74970 | A
2545+
58121 | B
2546+
74999 | D
2547+
58715 | H
2548+
;
2549+
2550+
groupingFilterIsAlwaysFalse
2551+
required_capability: per_agg_filtering
2552+
FROM employees
2553+
| MV_EXPAND job_positions
2554+
| STATS max = max(salary) WHERE first_name == "" BY job_positions = SUBSTRING(job_positions, 1, 1)
2555+
| SORT job_positions
2556+
| LIMIT 4
2557+
;
2558+
2559+
max:integer | job_positions:keyword
2560+
null | A
2561+
null | B
2562+
null | D
2563+
null | H
2564+
;
2565+
2566+
groupingFilterSometimesMatches
2567+
required_capability: per_agg_filtering
2568+
FROM employees
2569+
| MV_EXPAND job_positions
2570+
| STATS max = max(salary) WHERE first_name IS NULL BY job_positions = SUBSTRING(job_positions, 1, 1)
2571+
| SORT job_positions
2572+
| LIMIT 4
2573+
;
2574+
2575+
max:integer | job_positions:keyword
2576+
62233 | A
2577+
39878 | B
2578+
67492 | D
2579+
null | H
2580+
;
2581+
2582+
groupingByOrdinalsFilterIsAlwaysTrue
2583+
required_capability: per_agg_filtering
2584+
required_capability: per_agg_filtering_ords
2585+
FROM employees
2586+
| STATS max = max(salary) WHERE salary > 0 BY job_positions
2587+
| SORT job_positions
2588+
| LIMIT 4
2589+
;
2590+
2591+
max:integer | job_positions:keyword
2592+
74970 | Accountant
2593+
69904 | Architect
2594+
58121 | Business Analyst
2595+
74999 | Data Scientist
2596+
;
2597+
2598+
groupingByOrdinalsFilterIsAlwaysFalse
2599+
required_capability: per_agg_filtering
2600+
required_capability: per_agg_filtering_ords
2601+
FROM employees
2602+
| STATS max = max(salary) WHERE first_name == "" BY job_positions
2603+
| SORT job_positions
2604+
| LIMIT 4
2605+
;
2606+
2607+
max:integer | job_positions:keyword
2608+
null | Accountant
2609+
null | Architect
2610+
null | Business Analyst
2611+
null | Data Scientist
2612+
;
2613+
2614+
groupingByOrdinalsFilterSometimesMatches
2615+
required_capability: per_agg_filtering
2616+
required_capability: per_agg_filtering_ords
2617+
FROM employees
2618+
| STATS max = max(salary) WHERE first_name IS NULL BY job_positions
2619+
| SORT job_positions
2620+
| LIMIT 4
2621+
;
2622+
2623+
max:integer | job_positions:keyword
2624+
39878 | Accountant
2625+
62233 | Architect
2626+
39878 | Business Analyst
2627+
67492 | Data Scientist
2628+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,11 @@ public enum Cap {
393393
*/
394394
PER_AGG_FILTERING,
395395

396+
/**
397+
* Fix {@link #PER_AGG_FILTERING} grouped by ordinals.
398+
*/
399+
PER_AGG_FILTERING_ORDS,
400+
396401
/**
397402
* Fix for an optimization that caused wrong results
398403
* https://github.com/elastic/elasticsearch/issues/115281

0 commit comments

Comments
 (0)