Skip to content

Commit 42beb50

Browse files
committed
Chunk hash aggregation output
1 parent b619e14 commit 42beb50

File tree

8 files changed

+225
-49
lines changed

8 files changed

+225
-49
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,11 @@ private static Operator operator(DriverContext driverContext, String grouping, S
181181
);
182182
default -> throw new IllegalArgumentException("unsupported grouping [" + grouping + "]");
183183
};
184+
int pageSize = 16 * 1024;
184185
return new HashAggregationOperator(
185186
List.of(supplier(op, dataType, filter).groupingAggregatorFactory(AggregatorMode.SINGLE, List.of(groups.size()))),
186-
() -> BlockHash.build(groups, driverContext.blockFactory(), 16 * 1024, false),
187+
() -> BlockHash.build(groups, driverContext.blockFactory(), pageSize, false),
188+
pageSize,
187189
driverContext
188190
);
189191
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java

Lines changed: 84 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.compute.data.IntBlock;
2323
import org.elasticsearch.compute.data.IntVector;
2424
import org.elasticsearch.compute.data.Page;
25+
import org.elasticsearch.core.Releasable;
2526
import org.elasticsearch.core.Releasables;
2627
import org.elasticsearch.core.TimeValue;
2728
import org.elasticsearch.index.analysis.AnalysisRegistry;
@@ -58,12 +59,14 @@ public Operator get(DriverContext driverContext) {
5859
analysisRegistry,
5960
maxPageSize
6061
),
62+
maxPageSize,
6163
driverContext
6264
);
6365
}
6466
return new HashAggregationOperator(
6567
aggregators,
6668
() -> BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false),
69+
maxPageSize,
6770
driverContext
6871
);
6972
}
@@ -78,9 +81,10 @@ public String describe() {
7881
}
7982
}
8083

81-
private boolean finished;
82-
private Page output;
84+
private final int maxPageSize;
85+
private Emitter emitter;
8386

87+
private boolean blockHashClosed = false;
8488
private final BlockHash blockHash;
8589

8690
private final List<GroupingAggregator> aggregators;
@@ -112,8 +116,10 @@ public String describe() {
112116
public HashAggregationOperator(
113117
List<GroupingAggregator.Factory> aggregators,
114118
Supplier<BlockHash> blockHash,
119+
int maxPageSize,
115120
DriverContext driverContext
116121
) {
122+
this.maxPageSize = maxPageSize;
117123
this.aggregators = new ArrayList<>(aggregators.size());
118124
this.driverContext = driverContext;
119125
boolean success = false;
@@ -132,7 +138,7 @@ public HashAggregationOperator(
132138

133139
@Override
134140
public boolean needsInput() {
135-
return finished == false;
141+
return emitter == null;
136142
}
137143

138144
@Override
@@ -201,59 +207,97 @@ public void close() {
201207

202208
@Override
203209
public Page getOutput() {
204-
Page p = output;
205-
if (p != null) {
206-
rowsEmitted += p.getPositionCount();
210+
if (emitter == null) {
211+
return null;
207212
}
208-
output = null;
209-
return p;
213+
return emitter.nextPage();
210214
}
211215

212-
@Override
213-
public void finish() {
214-
if (finished) {
215-
return;
216+
private class Emitter implements Releasable {
217+
private final int[] aggBlockCounts;
218+
private int position = -1;
219+
private IntVector allSelected = null;
220+
private Block[] allKeys;
221+
222+
Emitter(int[] aggBlockCounts) {
223+
this.aggBlockCounts = aggBlockCounts;
216224
}
217-
finished = true;
218-
Block[] blocks = null;
219-
IntVector selected = null;
220-
boolean success = false;
221-
try {
222-
selected = blockHash.nonEmpty();
223-
Block[] keys = blockHash.getKeys();
224-
int[] aggBlockCounts = aggregators.stream().mapToInt(GroupingAggregator::evaluateBlockCount).toArray();
225-
blocks = new Block[keys.length + Arrays.stream(aggBlockCounts).sum()];
226-
System.arraycopy(keys, 0, blocks, 0, keys.length);
227-
int offset = keys.length;
228-
for (int i = 0; i < aggregators.size(); i++) {
229-
var aggregator = aggregators.get(i);
230-
aggregator.evaluate(blocks, offset, selected, driverContext);
231-
offset += aggBlockCounts[i];
225+
226+
Page nextPage() {
227+
if (position == -1) {
228+
position = 0;
229+
// TODO: chunk selected and keys
230+
allKeys = blockHash.getKeys();
231+
allSelected = blockHash.nonEmpty();
232+
blockHashClosed = true;
233+
blockHash.close();
232234
}
233-
output = new Page(blocks);
234-
success = true;
235-
} finally {
236-
// selected should always be closed
237-
if (selected != null) {
238-
selected.close();
235+
final int endPosition = Math.toIntExact(Math.min(position + (long) maxPageSize, allSelected.getPositionCount()));
236+
if (endPosition == position) {
237+
return null;
239238
}
240-
if (success == false && blocks != null) {
241-
Releasables.closeExpectNoException(blocks);
239+
final boolean singlePage = position == 0 && endPosition == allSelected.getPositionCount();
240+
final Block[] blocks = new Block[allKeys.length + Arrays.stream(aggBlockCounts).sum()];
241+
IntVector selected = null;
242+
boolean success = false;
243+
try {
244+
if (singlePage) {
245+
this.allSelected.incRef();
246+
selected = this.allSelected;
247+
for (int i = 0; i < allKeys.length; i++) {
248+
allKeys[i].incRef();
249+
blocks[i] = allKeys[i];
250+
}
251+
} else {
252+
final int[] positions = new int[endPosition - position];
253+
for (int i = 0; i < positions.length; i++) {
254+
positions[i] = position + i;
255+
}
256+
selected = allSelected.filter(positions);
257+
for (int keyIndex = 0; keyIndex < allKeys.length; keyIndex++) {
258+
blocks[keyIndex] = allKeys[keyIndex].filter(positions);
259+
}
260+
}
261+
int blockOffset = allKeys.length;
262+
for (int i = 0; i < aggregators.size(); i++) {
263+
aggregators.get(i).evaluate(blocks, blockOffset, selected, driverContext);
264+
blockOffset += aggBlockCounts[i];
265+
}
266+
var output = new Page(blocks);
267+
rowsEmitted += output.getPositionCount();
268+
success = true;
269+
return output;
270+
} finally {
271+
position = endPosition;
272+
Releasables.close(selected, success ? null : Releasables.wrap(blocks));
242273
}
243274
}
275+
276+
@Override
277+
public void close() {
278+
Releasables.close(allSelected, allKeys != null ? Releasables.wrap(allKeys) : null);
279+
}
280+
281+
boolean doneEmitting() {
282+
return allSelected != null && position >= allSelected.getPositionCount();
283+
}
284+
}
285+
286+
@Override
287+
public void finish() {
288+
if (emitter == null) {
289+
emitter = new Emitter(aggregators.stream().mapToInt(GroupingAggregator::evaluateBlockCount).toArray());
290+
}
244291
}
245292

246293
@Override
247294
public boolean isFinished() {
248-
return finished && output == null;
295+
return emitter != null && emitter.doneEmitting();
249296
}
250297

251298
@Override
252299
public void close() {
253-
if (output != null) {
254-
output.releaseBlocks();
255-
}
256-
Releasables.close(blockHash, () -> Releasables.close(aggregators));
300+
Releasables.close(emitter, blockHashClosed ? null : blockHash, () -> Releasables.close(aggregators));
257301
}
258302

259303
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,16 +205,16 @@ public Page getOutput() {
205205
return null;
206206
}
207207
if (valuesAggregator != null) {
208-
try {
209-
return valuesAggregator.getOutput();
210-
} finally {
211-
final ValuesAggregator aggregator = this.valuesAggregator;
212-
this.valuesAggregator = null;
213-
Releasables.close(aggregator);
208+
final Page output = valuesAggregator.getOutput();
209+
if (output == null) {
210+
Releasables.close(valuesAggregator, () -> this.valuesAggregator = null);
211+
} else {
212+
return output;
214213
}
215214
}
216215
if (ordinalAggregators.isEmpty() == false) {
217216
try {
217+
// TODO: chunk output pages
218218
return mergeOrdinalsSegmentResults();
219219
} catch (IOException e) {
220220
throw new UncheckedIOException(e);
@@ -510,6 +510,7 @@ private static class ValuesAggregator implements Releasable {
510510
maxPageSize,
511511
false
512512
),
513+
maxPageSize,
513514
driverContext
514515
);
515516
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorFactories.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public Operator get(DriverContext driverContext) {
6464
return new HashAggregationOperator(
6565
aggregators,
6666
() -> new TimeSeriesBlockHash(tsHashChannel, timeBucketChannel, driverContext),
67+
maxPageSize,
6768
driverContext
6869
);
6970
}
@@ -99,6 +100,7 @@ public Operator get(DriverContext driverContext) {
99100
return new HashAggregationOperator(
100101
aggregators,
101102
() -> BlockHash.build(hashGroups, driverContext.blockFactory(), maxPageSize, false),
103+
maxPageSize,
102104
driverContext
103105
);
104106
}
@@ -127,6 +129,7 @@ public Operator get(DriverContext driverContext) {
127129
return new HashAggregationOperator(
128130
aggregators,
129131
() -> BlockHash.build(groupings, driverContext.blockFactory(), maxPageSize, false),
132+
maxPageSize,
130133
driverContext
131134
);
132135
}

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ public String toString() {
211211
randomPageSize(),
212212
false
213213
),
214+
randomPageSize(),
214215
driverContext
215216
)
216217
);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
import java.util.ArrayList;
4444
import java.util.List;
45+
import java.util.Locale;
4546
import java.util.SortedSet;
4647
import java.util.TreeSet;
4748
import java.util.function.Function;
@@ -102,13 +103,25 @@ private Operator.OperatorFactory simpleWithMode(
102103
if (randomBoolean()) {
103104
supplier = chunkGroups(emitChunkSize, supplier);
104105
}
105-
return new HashAggregationOperator.HashAggregationOperatorFactory(
106+
final int maxPageSize = randomPageSize();
107+
final var hashOperatorFactory = new HashAggregationOperator.HashAggregationOperatorFactory(
106108
List.of(new BlockHash.GroupSpec(0, ElementType.LONG)),
107109
mode,
108110
List.of(supplier.groupingAggregatorFactory(mode, channels(mode))),
109111
randomPageSize(),
110112
null
111113
);
114+
return new Operator.OperatorFactory() {
115+
@Override
116+
public Operator get(DriverContext driverContext) {
117+
return assertingOutputPageSize(hashOperatorFactory.get(driverContext), driverContext.blockFactory(), maxPageSize);
118+
}
119+
120+
@Override
121+
public String describe() {
122+
return hashOperatorFactory.describe();
123+
}
124+
};
112125
}
113126

114127
@Override
@@ -761,4 +774,76 @@ public String describe() {
761774
};
762775
}
763776

777+
static Operator assertingOutputPageSize(Operator operator, BlockFactory blockFactory, int maxPageSize) {
778+
return new Operator() {
779+
private final List<Page> pages = new ArrayList<>();
780+
781+
@Override
782+
public boolean needsInput() {
783+
return operator.needsInput();
784+
}
785+
786+
@Override
787+
public void addInput(Page page) {
788+
operator.addInput(page);
789+
}
790+
791+
@Override
792+
public void finish() {
793+
operator.finish();
794+
}
795+
796+
@Override
797+
public boolean isFinished() {
798+
return operator.isFinished();
799+
}
800+
801+
@Override
802+
public Page getOutput() {
803+
final Page page = operator.getOutput();
804+
if (page != null && page.getPositionCount() > maxPageSize) {
805+
page.releaseBlocks();
806+
throw new AssertionError(
807+
String.format(
808+
Locale.ROOT,
809+
"Operator %s didn't chunk output pages properly; got an output page with %s positions, max_page_size=%s",
810+
operator,
811+
page.getPositionCount(),
812+
maxPageSize
813+
)
814+
);
815+
}
816+
pages.add(page);
817+
if (operator.isFinished() == false) {
818+
return null;
819+
}
820+
// TODO: Remove this workaround. We need to merge pages since we have many existing assertions expect a single out page.
821+
try {
822+
return BlockTestUtils.mergePages(blockFactory, pages);
823+
} finally {
824+
pages.forEach(Page::releaseBlocks);
825+
pages.clear();
826+
}
827+
}
828+
829+
@Override
830+
public Status status() {
831+
return operator.status();
832+
}
833+
834+
@Override
835+
public String toString() {
836+
return operator.toString();
837+
}
838+
839+
@Override
840+
public void close() {
841+
for (Page p : pages) {
842+
p.releaseBlocks();
843+
}
844+
operator.close();
845+
}
846+
};
847+
}
848+
764849
}

0 commit comments

Comments
 (0)