Skip to content

Commit 729f1d2

Browse files
committed
better
1 parent b7e4bfa commit 729f1d2

File tree

5 files changed

+60
-88
lines changed

5 files changed

+60
-88
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public record TopNDef(int order, boolean asc, boolean nullsFirst, int limit) {}
130130
public interface EmptyBucketGenerator {
131131
int getEmptyBucketCount();
132132

133-
Block generate(BlockFactory blockFactory, int maxPositionsInBucket);
133+
void generate(Block.Builder blockBuilder);
134134
}
135135

136136
/**

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java

Lines changed: 31 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
2121
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
2222
import org.elasticsearch.compute.data.Block;
23+
import org.elasticsearch.compute.data.DocBlock;
2324
import org.elasticsearch.compute.data.IntArrayBlock;
2425
import org.elasticsearch.compute.data.IntBigArrayBlock;
2526
import org.elasticsearch.compute.data.IntVector;
2627
import org.elasticsearch.compute.data.Page;
27-
import org.elasticsearch.core.Nullable;
2828
import org.elasticsearch.core.Releasables;
2929
import org.elasticsearch.core.TimeValue;
3030
import org.elasticsearch.index.analysis.AnalysisRegistry;
@@ -34,11 +34,9 @@
3434
import java.util.ArrayList;
3535
import java.util.Arrays;
3636
import java.util.List;
37-
import java.util.Map;
3837
import java.util.Objects;
3938
import java.util.concurrent.atomic.AtomicBoolean;
4039
import java.util.function.Supplier;
41-
import java.util.stream.Collectors;
4240

4341
import static java.util.Objects.requireNonNull;
4442
import static java.util.stream.Collectors.joining;
@@ -160,6 +158,7 @@ public void addInput(Page page) {
160158
Page initialPage = createInitialPage(page);
161159
if (initialPage != null) {
162160
addInputInternal(initialPage);
161+
return;
163162
}
164163
}
165164
addInputInternal(page);
@@ -311,50 +310,40 @@ protected Page wrapPage(Page page) {
311310
return page;
312311
}
313312

314-
private @Nullable Page createInitialPage(Page page) {
315-
if (anyGroupEmitsEmptyBuckets(groups) == false) {
316-
return null;
313+
private Page createInitialPage(Page page) {
314+
// If no groups are generating bucket keys, move on
315+
if (groups.stream().allMatch(g -> g.emptyBucketGenerator() == null)) {
316+
return page;
317317
}
318-
int maxPositionsInBucket = getMaxPositionsInBucket(groups, page);
319-
Map<Integer, BlockHash.GroupSpec> groupByChannel = groups.stream().collect(Collectors.toMap(BlockHash.GroupSpec::channel, x -> x));
320-
Block[] blocks = new Block[page.getBlockCount()];
321-
for (int i = 0; i < page.getBlockCount(); i++) {
322-
if (groupByChannel.containsKey(i)) {
323-
BlockHash.EmptyBucketGenerator emptyBucketGenerator = groupByChannel.get(i).emptyBucketGenerator();
324-
blocks[i] = emptyBucketGenerator != null
325-
? emptyBucketGenerator.generate(driverContext.blockFactory(), maxPositionsInBucket)
326-
: copyValues(page.getBlock(i), maxPositionsInBucket);
327-
} else {
328-
blocks[i] = driverContext.blockFactory().newConstantNullBlock(maxPositionsInBucket);
329-
}
330-
}
331-
return new Page(blocks);
332-
}
333-
334-
private Block copyValues(Block block, int maxPositionsInBucket) {
335-
try (Block.Builder newBlockBuilder = block.elementType().newBlockBuilder(block.getPositionCount(), driverContext.blockFactory())) {
336-
newBlockBuilder.copyFrom(block, 0, block.getPositionCount());
337-
for (int i = block.getPositionCount(); i < maxPositionsInBucket; i++) {
338-
newBlockBuilder.appendNull();
339-
}
340-
return newBlockBuilder.build();
318+
Block.Builder[] blockBuilders = new Block.Builder[page.getBlockCount()];
319+
for (int channel = 0; channel < page.getBlockCount(); channel++) {
320+
Block block = page.getBlock(channel);
321+
blockBuilders[channel] = block.elementType().newBlockBuilder(block.getPositionCount(), driverContext.blockFactory());
322+
blockBuilders[channel].copyFrom(block, 0, block.getPositionCount());
341323
}
342-
}
343-
344-
private static boolean anyGroupEmitsEmptyBuckets(List<BlockHash.GroupSpec> groups) {
345-
return groups.stream().anyMatch(g -> g.emptyBucketGenerator() != null);
346-
}
347-
348-
private static int getMaxPositionsInBucket(List<BlockHash.GroupSpec> groups, Page page) {
349-
int maxPositionCount = 0;
350324
for (BlockHash.GroupSpec group : groups) {
351325
BlockHash.EmptyBucketGenerator emptyBucketGenerator = group.emptyBucketGenerator();
352-
int positionCount = emptyBucketGenerator != null
353-
? emptyBucketGenerator.getEmptyBucketCount()
354-
: page.getBlock(group.channel()).getPositionCount();
355-
maxPositionCount = Math.max(maxPositionCount, positionCount);
326+
if (emptyBucketGenerator != null) {
327+
for (int channel = 0; channel < page.getBlockCount(); channel++) {
328+
if (group.channel() == channel) {
329+
emptyBucketGenerator.generate(blockBuilders[channel]);
330+
} else {
331+
for (int i = 0; i < emptyBucketGenerator.getEmptyBucketCount(); i++) {
332+
if (page.getBlock(channel) instanceof DocBlock) {
333+
// TODO: DocBlock doesn't allow appending nulls
334+
((DocBlock.Builder) blockBuilders[channel]).appendShard(0).appendSegment(0).appendDoc(0);
335+
} else {
336+
blockBuilders[channel].appendNull();
337+
}
338+
}
339+
}
340+
}
341+
}
356342
}
357-
return maxPositionCount;
343+
Block[] blocks = Arrays.stream(blockBuilders).map(Block.Builder::build).toArray(Block[]::new);
344+
Releasables.closeExpectNoException(blockBuilders);
345+
page.releaseBlocks();
346+
return new Page(blocks);
358347
}
359348

360349
@Override

x-pack/plugin/esql/qa/testFixtures/src/main/resources/bucket.csv-spec

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -190,28 +190,32 @@ heightBucket:double
190190

191191
bucketsWithEmptyYear#[skip:-8.13.99, reason:BUCKET renamed in 8.14]
192192
FROM employees
193-
| WHERE hire_date >= "1985-01-01T00:00:00Z" AND hire_date < "1990-01-01T00:00:00Z"
193+
| WHERE hire_date >= "1980-01-01T00:00:00Z" AND hire_date < "1990-01-01T00:00:00Z"
194194
| STATS BY
195-
yearBucket = BUCKET(hire_date, 1 year, "1985-01-01T00:00:00Z", "1990-01-01T00:00:00Z", true),
195+
yearBucket = BUCKET(hire_date, 1 year, "1980-01-01T00:00:00Z", "1990-01-01T00:00:00Z", true),
196196
heightBucket = ROUND(BUCKET(height, 10, 1.0, 2.0), 1)
197197
| SORT yearBucket, heightBucket
198198
;
199199

200200
yearBucket:datetime | heightBucket:double
201-
1985-01-01T00:00:00.000Z | 1.0
201+
1980-01-01T00:00:00.000Z | null
202+
1981-01-01T00:00:00.000Z | null
203+
1982-01-01T00:00:00.000Z | null
204+
1983-01-01T00:00:00.000Z | null
205+
1984-01-01T00:00:00.000Z | null
202206
1985-01-01T00:00:00.000Z | 1.4
203207
1985-01-01T00:00:00.000Z | 1.7
204208
1985-01-01T00:00:00.000Z | 1.8
205209
1985-01-01T00:00:00.000Z | 1.9
206210
1985-01-01T00:00:00.000Z | 2.0
207-
1986-01-01T00:00:00.000Z | 1.1
211+
1985-01-01T00:00:00.000Z | null
208212
1986-01-01T00:00:00.000Z | 1.4
209213
1986-01-01T00:00:00.000Z | 1.5
210214
1986-01-01T00:00:00.000Z | 1.7
211215
1986-01-01T00:00:00.000Z | 1.8
212216
1986-01-01T00:00:00.000Z | 2.0
213217
1986-01-01T00:00:00.000Z | 2.1
214-
1987-01-01T00:00:00.000Z | 1.2
218+
1986-01-01T00:00:00.000Z | null
215219
1987-01-01T00:00:00.000Z | 1.4
216220
1987-01-01T00:00:00.000Z | 1.5
217221
1987-01-01T00:00:00.000Z | 1.6
@@ -220,16 +224,17 @@ yearBucket:datetime | heightBucket:double
220224
1987-01-01T00:00:00.000Z | 1.9
221225
1987-01-01T00:00:00.000Z | 2.0
222226
1987-01-01T00:00:00.000Z | 2.1
223-
1988-01-01T00:00:00.000Z | 1.3
227+
1987-01-01T00:00:00.000Z | null
224228
1988-01-01T00:00:00.000Z | 1.4
225229
1988-01-01T00:00:00.000Z | 1.5
226230
1988-01-01T00:00:00.000Z | 1.7
227231
1988-01-01T00:00:00.000Z | 1.8
228232
1988-01-01T00:00:00.000Z | 1.9
229-
1989-01-01T00:00:00.000Z | 1.4
233+
1988-01-01T00:00:00.000Z | null
230234
1989-01-01T00:00:00.000Z | 1.5
231235
1989-01-01T00:00:00.000Z | 1.7
232236
1989-01-01T00:00:00.000Z | 2.0
237+
1989-01-01T00:00:00.000Z | null
233238
;
234239

235240
bucketsWithEmptyHeight#[skip:-8.13.99, reason:BUCKET renamed in 8.14]
@@ -242,20 +247,17 @@ FROM employees
242247
;
243248

244249
yearBucket:datetime | heightBucket:double
245-
1985-01-01T00:00:00.000Z | 1.0
246250
1985-01-01T00:00:00.000Z | 1.4
247251
1985-01-01T00:00:00.000Z | 1.7
248252
1985-01-01T00:00:00.000Z | 1.8
249253
1985-01-01T00:00:00.000Z | 1.9
250254
1985-01-01T00:00:00.000Z | 2.0
251-
1986-01-01T00:00:00.000Z | 1.1
252255
1986-01-01T00:00:00.000Z | 1.4
253256
1986-01-01T00:00:00.000Z | 1.5
254257
1986-01-01T00:00:00.000Z | 1.7
255258
1986-01-01T00:00:00.000Z | 1.8
256259
1986-01-01T00:00:00.000Z | 2.0
257260
1986-01-01T00:00:00.000Z | 2.1
258-
1987-01-01T00:00:00.000Z | 1.2
259261
1987-01-01T00:00:00.000Z | 1.4
260262
1987-01-01T00:00:00.000Z | 1.5
261263
1987-01-01T00:00:00.000Z | 1.6
@@ -264,16 +266,24 @@ yearBucket:datetime | heightBucket:double
264266
1987-01-01T00:00:00.000Z | 1.9
265267
1987-01-01T00:00:00.000Z | 2.0
266268
1987-01-01T00:00:00.000Z | 2.1
267-
1988-01-01T00:00:00.000Z | 1.3
268269
1988-01-01T00:00:00.000Z | 1.4
269270
1988-01-01T00:00:00.000Z | 1.5
270271
1988-01-01T00:00:00.000Z | 1.7
271272
1988-01-01T00:00:00.000Z | 1.8
272273
1988-01-01T00:00:00.000Z | 1.9
273-
1989-01-01T00:00:00.000Z | 1.4
274274
1989-01-01T00:00:00.000Z | 1.5
275275
1989-01-01T00:00:00.000Z | 1.7
276276
1989-01-01T00:00:00.000Z | 2.0
277+
null | 1.0
278+
null | 1.1
279+
null | 1.2
280+
null | 1.3
281+
null | 1.4
282+
null | 1.5
283+
null | 1.6
284+
null | 1.7
285+
null | 1.8
286+
null | 1.9
277287
;
278288

279289

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/DatetimeEmptyBucketGenerator.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import org.elasticsearch.common.Rounding;
1212
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
1313
import org.elasticsearch.compute.data.Block;
14-
import org.elasticsearch.compute.data.BlockFactory;
15-
import org.elasticsearch.compute.data.ElementType;
1614
import org.elasticsearch.compute.data.LongBlock;
1715
import org.elasticsearch.core.TimeValue;
1816
import org.elasticsearch.xpack.esql.core.expression.Expression;
@@ -67,18 +65,9 @@ public int getEmptyBucketCount() {
6765
}
6866

6967
@Override
70-
public Block generate(BlockFactory blockFactory, int maxPositionsInBucket) {
71-
try (LongBlock.Builder newBlockBuilder = (LongBlock.Builder) ElementType.LONG.newBlockBuilder(maxPositionsInBucket, blockFactory)) {
72-
int i = 0;
73-
for (long bucket = rounding.round(from); bucket < to; bucket = rounding.nextRoundingValue(bucket)) {
74-
newBlockBuilder.appendLong(bucket);
75-
i++;
76-
}
77-
while (i < maxPositionsInBucket) {
78-
newBlockBuilder.appendNull();
79-
i++;
80-
}
81-
return newBlockBuilder.build();
68+
public void generate(Block.Builder blockBuilder) {
69+
for (long bucket = rounding.round(from); bucket < to; bucket = rounding.nextRoundingValue(bucket)) {
70+
((LongBlock.Builder) blockBuilder).appendLong(bucket);
8271
}
8372
}
8473

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/NumericEmptyBucketGenerator.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99

1010
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
1111
import org.elasticsearch.compute.data.Block;
12-
import org.elasticsearch.compute.data.BlockFactory;
1312
import org.elasticsearch.compute.data.DoubleBlock;
14-
import org.elasticsearch.compute.data.ElementType;
1513
import org.elasticsearch.xpack.esql.core.expression.Expression;
1614
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
1715

@@ -38,23 +36,9 @@ public int getEmptyBucketCount() {
3836
}
3937

4038
@Override
41-
public Block generate(BlockFactory blockFactory, int maxPositionsInBucket) {
42-
try (
43-
DoubleBlock.Builder newBlockBuilder = (DoubleBlock.Builder) ElementType.DOUBLE.newBlockBuilder(
44-
maxPositionsInBucket,
45-
blockFactory
46-
)
47-
) {
48-
int i = 0;
49-
for (double bucket = round(Math.floor(from / roundTo) * roundTo, 2); bucket < to; bucket = round(bucket + roundTo, 2)) {
50-
newBlockBuilder.appendDouble(bucket);
51-
i++;
52-
}
53-
while (i < maxPositionsInBucket) {
54-
newBlockBuilder.appendNull();
55-
i++;
56-
}
57-
return newBlockBuilder.build();
39+
public void generate(Block.Builder blockBuilder) {
40+
for (double bucket = round(Math.floor(from / roundTo) * roundTo, 2); bucket < to; bucket = round(bucket + roundTo, 2)) {
41+
((DoubleBlock.Builder) blockBuilder).appendDouble(bucket);
5842
}
5943
}
6044

0 commit comments

Comments
 (0)