Skip to content

Commit 0e2f832

Browse files
authored
ESQL: Test partially filtered aggs (#114510) (#114654)
Tests for partially filtered aggs. It uses the existing aggs tests and adds junk rows that are filtered away. That way we don't have to add new testing assertions to each class - we just can reuse the existing assertions.
1 parent 1530802 commit 0e2f832

File tree

4 files changed

+181
-1
lines changed

4 files changed

+181
-1
lines changed

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.compute.data.LongBlock;
2323
import org.elasticsearch.compute.data.Page;
2424
import org.elasticsearch.compute.data.TestBlockFactory;
25+
import org.elasticsearch.compute.operator.AddGarbageRowsSourceOperator;
2526
import org.elasticsearch.compute.operator.AggregationOperator;
2627
import org.elasticsearch.compute.operator.CannedSourceOperator;
2728
import org.elasticsearch.compute.operator.Driver;
@@ -203,6 +204,22 @@ public void testNoneFiltered() {
203204
assertSimpleOutput(origInput, results);
204205
}
205206

207+
public void testSomeFiltered() {
208+
Operator.OperatorFactory factory = simpleWithMode(
209+
AggregatorMode.SINGLE,
210+
agg -> new FilteredAggregatorFunctionSupplier(agg, AddGarbageRowsSourceOperator.filterFactory())
211+
);
212+
DriverContext driverContext = driverContext();
213+
// Build the test data
214+
List<Page> input = CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), 10));
215+
List<Page> origInput = BlockTestUtils.deepCopyOf(input, TestBlockFactory.getNonBreakingInstance());
216+
// Sprinkle garbage into it
217+
input = CannedSourceOperator.collectPages(new AddGarbageRowsSourceOperator(new CannedSourceOperator(input.iterator())));
218+
List<Page> results = drive(factory.get(driverContext), input.iterator(), driverContext);
219+
assertThat(results, hasSize(1));
220+
assertSimpleOutput(origInput, results);
221+
}
222+
206223
// Returns an intermediate state that is equivalent to what the local execution planner will emit
207224
// if it determines that certain shards have no relevant data.
208225
List<Page> nullIntermediateState(BlockFactory blockFactory) {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FilteredAggregatorFunctionTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,9 @@ public void testNoneFiltered() {
103103
public void testAllFiltered() {
104104
assumeFalse("can't double filter. tests already filter.", true);
105105
}
106+
107+
@Override
108+
public void testSomeFiltered() {
109+
assumeFalse("can't double filter. tests already filter.", true);
110+
}
106111
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.compute.data.LongVector;
2727
import org.elasticsearch.compute.data.Page;
2828
import org.elasticsearch.compute.data.TestBlockFactory;
29+
import org.elasticsearch.compute.operator.AddGarbageRowsSourceOperator;
2930
import org.elasticsearch.compute.operator.CannedSourceOperator;
3031
import org.elasticsearch.compute.operator.DriverContext;
3132
import org.elasticsearch.compute.operator.ForkingOperatorTestCase;
@@ -53,6 +54,7 @@
5354
import static org.elasticsearch.compute.data.BlockTestUtils.append;
5455
import static org.hamcrest.Matchers.equalTo;
5556
import static org.hamcrest.Matchers.hasSize;
57+
import static org.hamcrest.Matchers.in;
5658

5759
/**
5860
* Shared tests for testing grouped aggregations.
@@ -160,11 +162,17 @@ protected long randomGroupId(int pageSize) {
160162

161163
@Override
162164
protected final void assertSimpleOutput(List<Page> input, List<Page> results) {
165+
assertSimpleOutput(input, results, true);
166+
}
167+
168+
private void assertSimpleOutput(List<Page> input, List<Page> results, boolean assertGroupCount) {
163169
SeenGroups seenGroups = seenGroups(input);
164170

165171
assertThat(results, hasSize(1));
166172
assertThat(results.get(0).getBlockCount(), equalTo(2));
167-
assertThat(results.get(0).getPositionCount(), equalTo(seenGroups.size()));
173+
if (assertGroupCount) {
174+
assertThat(results.get(0).getPositionCount(), equalTo(seenGroups.size()));
175+
}
168176

169177
Block groups = results.get(0).getBlock(0);
170178
Block result = results.get(0).getBlock(1);
@@ -394,6 +402,23 @@ public final void testNoneFiltered() {
394402
assertSimpleOutput(origInput, results);
395403
}
396404

405+
public void testSomeFiltered() {
406+
Operator.OperatorFactory factory = simpleWithMode(
407+
AggregatorMode.SINGLE,
408+
agg -> new FilteredAggregatorFunctionSupplier(agg, AddGarbageRowsSourceOperator.filterFactory())
409+
);
410+
DriverContext driverContext = driverContext();
411+
// Build the test data
412+
List<Page> input = CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), 10));
413+
List<Page> origInput = BlockTestUtils.deepCopyOf(input, TestBlockFactory.getNonBreakingInstance());
414+
// Sprinkle garbage into it
415+
input = CannedSourceOperator.collectPages(new AddGarbageRowsSourceOperator(new CannedSourceOperator(input.iterator())));
416+
List<Page> results = drive(factory.get(driverContext), input.iterator(), driverContext);
417+
assertThat(results, hasSize(1));
418+
419+
assertSimpleOutput(origInput, results, false);
420+
}
421+
397422
/**
398423
* Asserts that the output from an empty input is a {@link Block} containing
399424
* only {@code null}. Override for {@code count} style aggregations that
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.data.Block;
12+
import org.elasticsearch.compute.data.BooleanBlock;
13+
import org.elasticsearch.compute.data.BytesRefBlock;
14+
import org.elasticsearch.compute.data.DoubleBlock;
15+
import org.elasticsearch.compute.data.FloatBlock;
16+
import org.elasticsearch.compute.data.IntBlock;
17+
import org.elasticsearch.compute.data.LongBlock;
18+
import org.elasticsearch.compute.data.Page;
19+
import org.elasticsearch.core.Releasables;
20+
import org.elasticsearch.test.ESTestCase;
21+
22+
/**
23+
* A {@link SourceOperator} that inserts random garbage into data from another
24+
* {@link SourceOperator}. It also inserts an extra channel at the end of the page
25+
* containing a {@code boolean} column. If it is {@code true} then the data came
26+
* from the original operator. If it's {@code false} then the data is random
27+
* garbage inserted by this operator.
28+
*/
29+
public class AddGarbageRowsSourceOperator extends SourceOperator {
30+
public static EvalOperator.ExpressionEvaluator.Factory filterFactory() {
31+
/*
32+
* Grabs the filter from the last block. That's where we put it.
33+
*/
34+
return ctx -> new EvalOperator.ExpressionEvaluator() {
35+
@Override
36+
public Block eval(Page page) {
37+
Block block = page.getBlock(page.getBlockCount() - 1);
38+
block.incRef();
39+
return block;
40+
}
41+
42+
@Override
43+
public void close() {}
44+
};
45+
}
46+
47+
private final SourceOperator next;
48+
49+
public AddGarbageRowsSourceOperator(SourceOperator next) {
50+
this.next = next;
51+
}
52+
53+
@Override
54+
public void finish() {
55+
next.finish();
56+
}
57+
58+
@Override
59+
public boolean isFinished() {
60+
return next.isFinished();
61+
}
62+
63+
@Override
64+
public Page getOutput() {
65+
Page page = next.getOutput();
66+
if (page == null) {
67+
return null;
68+
}
69+
Block.Builder[] newBlocks = new Block.Builder[page.getBlockCount() + 1];
70+
try {
71+
for (int b = 0; b < page.getBlockCount(); b++) {
72+
Block block = page.getBlock(b);
73+
newBlocks[b] = block.elementType().newBlockBuilder(page.getPositionCount(), block.blockFactory());
74+
}
75+
newBlocks[page.getBlockCount()] = page.getBlock(0).blockFactory().newBooleanBlockBuilder(page.getPositionCount());
76+
77+
for (int p = 0; p < page.getPositionCount(); p++) {
78+
if (ESTestCase.randomBoolean()) {
79+
insertGarbageRows(newBlocks, page);
80+
}
81+
copyPosition(newBlocks, page, p);
82+
if (ESTestCase.randomBoolean()) {
83+
insertGarbageRows(newBlocks, page);
84+
}
85+
}
86+
87+
return new Page(Block.Builder.buildAll(newBlocks));
88+
} finally {
89+
Releasables.close(Releasables.wrap(newBlocks), page::releaseBlocks);
90+
}
91+
}
92+
93+
private void copyPosition(Block.Builder[] newBlocks, Page page, int p) {
94+
for (int b = 0; b < page.getBlockCount(); b++) {
95+
Block block = page.getBlock(b);
96+
newBlocks[b].copyFrom(block, p, p + 1);
97+
}
98+
signalKeep(newBlocks, true);
99+
}
100+
101+
private void insertGarbageRows(Block.Builder[] newBlocks, Page page) {
102+
int count = ESTestCase.between(1, 5);
103+
for (int c = 0; c < count; c++) {
104+
insertGarbageRow(newBlocks, page);
105+
}
106+
}
107+
108+
private void insertGarbageRow(Block.Builder[] newBlocks, Page page) {
109+
for (int b = 0; b < page.getBlockCount(); b++) {
110+
Block block = page.getBlock(b);
111+
switch (block.elementType()) {
112+
case BOOLEAN -> ((BooleanBlock.Builder) newBlocks[b]).appendBoolean(ESTestCase.randomBoolean());
113+
case BYTES_REF -> ((BytesRefBlock.Builder) newBlocks[b]).appendBytesRef(new BytesRef(ESTestCase.randomAlphaOfLength(5)));
114+
case COMPOSITE, DOC, UNKNOWN -> throw new UnsupportedOperationException();
115+
case INT -> ((IntBlock.Builder) newBlocks[b]).appendInt(ESTestCase.randomInt());
116+
case LONG -> ((LongBlock.Builder) newBlocks[b]).appendLong(ESTestCase.randomLong());
117+
case NULL -> newBlocks[b].appendNull();
118+
case DOUBLE -> ((DoubleBlock.Builder) newBlocks[b]).appendDouble(ESTestCase.randomDouble());
119+
case FLOAT -> ((FloatBlock.Builder) newBlocks[b]).appendFloat(ESTestCase.randomFloat());
120+
}
121+
}
122+
signalKeep(newBlocks, false);
123+
}
124+
125+
private void signalKeep(Block.Builder[] newBlocks, boolean shouldKeep) {
126+
((BooleanBlock.Builder) newBlocks[newBlocks.length - 1]).appendBoolean(shouldKeep);
127+
}
128+
129+
@Override
130+
public void close() {
131+
next.close();
132+
}
133+
}

0 commit comments

Comments
 (0)