Skip to content

Commit 51c2aa1

Browse files
authored
ESQL: Don't emit empty pages from hashes (#112601)
This stops the `BlockHash`es from emitting empty pages when the batch size and number of results from a page lines up. It doesn't hurt the aggs, but it was making testing more complex than we'd like. Closes #112443 Closes #112442
1 parent 082e721 commit 51c2aa1

File tree

7 files changed

+105
-36
lines changed

7 files changed

+105
-36
lines changed

muted-tests.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,6 @@ tests:
169169
- class: org.elasticsearch.xpack.esql.EsqlAsyncSecurityIT
170170
method: testIndexPatternErrorMessageComparison_ESQL_SearchDSL
171171
issue: https://github.com/elastic/elasticsearch/issues/112630
172-
- class: org.elasticsearch.compute.aggregation.blockhash.BlockHashTests
173-
method: testBytesRefLongHashHugeCombinatorialExplosion {forcePackedHash=false}
174-
issue: https://github.com/elastic/elasticsearch/issues/112442
175-
- class: org.elasticsearch.compute.aggregation.blockhash.BlockHashTests
176-
method: testBytesRefLongHashHugeCombinatorialExplosion {forcePackedHash=true}
177-
issue: https://github.com/elastic/elasticsearch/issues/112443
178172
- class: org.elasticsearch.xpack.ml.integration.MlJobIT
179173
method: testPutJob_GivenFarequoteConfig
180174
issue: https://github.com/elastic/elasticsearch/issues/112382
Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,17 @@
2222
* for how to add values to it. After adding all values, call {@link #emitOrds} to
2323
* flush the last batch of values to the aggs.
2424
*/
25-
public class AddBlock implements Releasable {
25+
public class AddPage implements Releasable {
2626
private final BlockFactory blockFactory;
27-
private final int emitBatchSize;
27+
private final long emitBatchSize;
2828
private final GroupingAggregatorFunction.AddInput addInput;
2929

3030
private int positionOffset = 0;
31-
private int added = 0;
31+
/**
32+
* Number of added documents. This is a {@code long} because callers will
33+
* often perform the combinatorial explosion of values.
34+
*/
35+
private long added = 0;
3236
private IntBlock.Builder ords;
3337
/**
3438
* State of the current position.
@@ -50,22 +54,26 @@ public class AddBlock implements Releasable {
5054
*/
5155
private int firstOrd = -1;
5256

53-
public AddBlock(BlockFactory blockFactory, int emitBatchSize, GroupingAggregatorFunction.AddInput addInput) {
57+
public AddPage(BlockFactory blockFactory, int emitBatchSize, GroupingAggregatorFunction.AddInput addInput) {
5458
this.blockFactory = blockFactory;
5559
this.emitBatchSize = emitBatchSize;
5660
this.addInput = addInput;
5761

5862
this.ords = blockFactory.newIntBlockBuilder(emitBatchSize);
5963
}
6064

65+
long added() {
66+
return added;
67+
}
68+
6169
/**
6270
* Append a single valued ordinal. This will flush the ordinals to the aggs
6371
* if we've added {@link #emitBatchSize}.
6472
*/
6573
protected final void appendOrdSv(int position, int ord) {
6674
assert firstOrd == -1 : "currently in a multivalue position";
6775
ords.appendInt(ord);
68-
if (++added % emitBatchSize == 0) {
76+
if (++added % emitBatchSize == 0L) {
6977
rollover(position + 1);
7078
}
7179
}
@@ -78,7 +86,7 @@ protected final void appendOrdSv(int position, int ord) {
7886
@Deprecated
7987
protected final void appendNullSv(int position) {
8088
ords.appendNull();
81-
if (++added % emitBatchSize == 0) {
89+
if (++added % emitBatchSize == 0L) {
8290
rollover(position + 1);
8391
}
8492
}
@@ -95,7 +103,7 @@ protected final void appendNullSv(int position) {
95103
* }</pre>
96104
*/
97105
protected final void appendOrdInMv(int position, int ord) {
98-
if (++added % emitBatchSize == 0) {
106+
if (++added % emitBatchSize == 0L) {
99107
switch (firstOrd) {
100108
case -1 -> ords.appendInt(ord);
101109
case -2 -> {
@@ -136,7 +144,20 @@ protected final void finishMv() {
136144
firstOrd = -1;
137145
}
138146

139-
protected final void emitOrds() {
147+
/**
148+
* Call when finished to emit all remaining ordinals to the aggs.
149+
*/
150+
protected final void flushRemaining() {
151+
if (firstOrd != -1) {
152+
throw new IllegalStateException("in the middle of a position");
153+
}
154+
if (added % emitBatchSize != 0) {
155+
// If the % is 0 then we just flushed and there isn't any need to flush an empty block.
156+
emitOrds();
157+
}
158+
}
159+
160+
private void emitOrds() {
140161
try (IntBlock ordsBlock = ords.build()) {
141162
addInput.add(positionOffset, ordsBlock);
142163
}
@@ -145,7 +166,7 @@ protected final void emitOrds() {
145166
private void rollover(int position) {
146167
emitOrds();
147168
positionOffset = position;
148-
ords = blockFactory.newIntBlockBuilder(emitBatchSize); // TODO add a clear method to the builder?
169+
ords = blockFactory.newIntBlockBuilder(Math.toIntExact(emitBatchSize));
149170
}
150171

151172
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BytesRef3BlockHash.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private void addVectors(BytesRefVector v1, BytesRefVector v2, BytesRefVector v3,
9898
}
9999
}
100100

101-
private class AddWork extends AddBlock {
101+
private class AddWork extends AddPage {
102102
final IntBlock b1;
103103
final IntBlock b2;
104104
final IntBlock b3;
@@ -137,7 +137,7 @@ void add() {
137137
}
138138
finishMv();
139139
}
140-
emitOrds();
140+
flushRemaining();
141141
}
142142
}
143143

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void close() {
117117
}
118118
}
119119

120-
class AddWork extends AddBlock {
120+
class AddWork extends AddPage {
121121
final Group[] groups;
122122
final int positionCount;
123123
int position;
@@ -142,7 +142,7 @@ void add() {
142142
addMultipleEntries();
143143
}
144144
}
145-
emitOrds();
145+
flushRemaining();
146146
}
147147

148148
private void addSingleEntry() {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/mvdedupe/IntLongBlockAdd.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@
99

1010
import org.elasticsearch.common.util.LongLongHash;
1111
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
12-
import org.elasticsearch.compute.aggregation.blockhash.AddBlock;
12+
import org.elasticsearch.compute.aggregation.blockhash.AddPage;
1313
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
1414
import org.elasticsearch.compute.data.BlockFactory;
1515
import org.elasticsearch.compute.data.IntBlock;
1616
import org.elasticsearch.compute.data.LongBlock;
1717

18-
public class IntLongBlockAdd extends AddBlock {
18+
public class IntLongBlockAdd extends AddPage {
1919
private final LongLongHash hash;
2020
private final MultivalueDedupeInt block1;
2121
private final MultivalueDedupeLong block2;
@@ -39,7 +39,7 @@ public void add() {
3939
for (int p = 0; p < positions; p++) {
4040
add1(p);
4141
}
42-
emitOrds();
42+
flushRemaining();
4343
}
4444

4545
private void add1(int position) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/mvdedupe/LongLongBlockAdd.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@
99

1010
import org.elasticsearch.common.util.LongLongHash;
1111
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
12-
import org.elasticsearch.compute.aggregation.blockhash.AddBlock;
12+
import org.elasticsearch.compute.aggregation.blockhash.AddPage;
1313
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
1414
import org.elasticsearch.compute.data.BlockFactory;
1515
import org.elasticsearch.compute.data.LongBlock;
1616

17-
public class LongLongBlockAdd extends AddBlock {
17+
public class LongLongBlockAdd extends AddPage {
1818
private final LongLongHash hash;
1919
private final MultivalueDedupeLong block1;
2020
private final MultivalueDedupeLong block2;
@@ -38,7 +38,7 @@ public void add() {
3838
for (int p = 0; p < positions; p++) {
3939
add1(p);
4040
}
41-
emitOrds();
41+
flushRemaining();
4242
}
4343

4444
private void add1(int position) {
Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,25 @@
1919
import java.util.ArrayList;
2020
import java.util.Arrays;
2121
import java.util.List;
22+
import java.util.Locale;
2223

2324
import static org.hamcrest.Matchers.equalTo;
2425

25-
public class AddBlockTests extends ESTestCase {
26+
public class AddPageTests extends ESTestCase {
2627
private final BlockFactory blockFactory = BlockFactoryTests.blockFactory(ByteSizeValue.ofGb(1));
2728

2829
public void testSv() {
2930
TestAddInput result = new TestAddInput();
3031
List<Added> expected = new ArrayList<>();
31-
try (AddBlock add = new AddBlock(blockFactory, 3, result)) {
32+
try (AddPage add = new AddPage(blockFactory, 3, result)) {
3233
add.appendOrdSv(0, 0);
3334
add.appendOrdSv(1, 2);
3435
add.appendOrdSv(2, 3);
3536
expected.add(added(0, 0, 2, 3));
3637
assertThat(result.added, equalTo(expected));
3738
add.appendOrdSv(3, 4);
38-
add.emitOrds();
39+
add.flushRemaining();
40+
assertThat(add.added(), equalTo(4L));
3941
}
4042
expected.add(added(3, 4));
4143
assertThat(result.added, equalTo(expected));
@@ -45,7 +47,7 @@ public void testSv() {
4547
public void testMvBlockEndsOnBatchBoundary() {
4648
TestAddInput result = new TestAddInput();
4749
List<Added> expected = new ArrayList<>();
48-
try (AddBlock add = new AddBlock(blockFactory, 3, result)) {
50+
try (AddPage add = new AddPage(blockFactory, 3, result)) {
4951
add.appendOrdInMv(0, 0);
5052
add.appendOrdInMv(0, 2);
5153
add.appendOrdInMv(0, 3);
@@ -58,18 +60,22 @@ public void testMvBlockEndsOnBatchBoundary() {
5860
expected.add(new Added(0, List.of(List.of(4), List.of(0, 2))));
5961
assertThat(result.added, equalTo(expected));
6062
add.finishMv();
61-
add.emitOrds();
63+
add.flushRemaining();
64+
assertThat(add.added(), equalTo(6L));
6265
}
63-
// We uselessly flush an empty position if emitBatchSize lines up with the total count
64-
expected.add(new Added(1, List.of(List.of())));
66+
/*
67+
* We do *not* uselessly flush an empty Block of ordinals. Doing so would
68+
* be a slight performance hit, but, worse, makes testing harder to reason
69+
* about.
70+
*/
6571
assertThat(result.added, equalTo(expected));
6672
assertThat(result.closed, equalTo(true));
6773
}
6874

6975
public void testMvPositionEndOnBatchBoundary() {
7076
TestAddInput result = new TestAddInput();
7177
List<Added> expected = new ArrayList<>();
72-
try (AddBlock add = new AddBlock(blockFactory, 4, result)) {
78+
try (AddPage add = new AddPage(blockFactory, 4, result)) {
7379
add.appendOrdInMv(0, 0);
7480
add.appendOrdInMv(0, 2);
7581
add.appendOrdInMv(0, 3);
@@ -80,7 +86,8 @@ public void testMvPositionEndOnBatchBoundary() {
8086
add.appendOrdInMv(1, 0);
8187
add.appendOrdInMv(1, 2);
8288
add.finishMv();
83-
add.emitOrds();
89+
add.flushRemaining();
90+
assertThat(add.added(), equalTo(6L));
8491
}
8592
// Because the first position ended on a block boundary we uselessly emit an empty position there
8693
expected.add(new Added(0, List.of(List.of(), List.of(0, 2))));
@@ -91,7 +98,7 @@ public void testMvPositionEndOnBatchBoundary() {
9198
public void testMv() {
9299
TestAddInput result = new TestAddInput();
93100
List<Added> expected = new ArrayList<>();
94-
try (AddBlock add = new AddBlock(blockFactory, 5, result)) {
101+
try (AddPage add = new AddPage(blockFactory, 5, result)) {
95102
add.appendOrdInMv(0, 0);
96103
add.appendOrdInMv(0, 2);
97104
add.appendOrdInMv(0, 3);
@@ -102,13 +109,43 @@ public void testMv() {
102109
assertThat(result.added, equalTo(expected));
103110
add.appendOrdInMv(1, 2);
104111
add.finishMv();
105-
add.emitOrds();
112+
add.flushRemaining();
113+
assertThat(add.added(), equalTo(6L));
106114
}
107115
expected.add(new Added(1, List.of(List.of(2))));
108116
assertThat(result.added, equalTo(expected));
109117
assertThat(result.closed, equalTo(true));
110118
}
111119

120+
/**
121+
* Test that we can add more than {@link Integer#MAX_VALUE} values. That's
122+
* more than two billion values. We've made the call as fast as we can.
123+
* Locally this test takes about 40 seconds for Nik.
124+
*/
125+
public void testMvBillions() {
126+
CountingAddInput counter = new CountingAddInput();
127+
try (AddPage add = new AddPage(blockFactory, 5, counter)) {
128+
for (int i = 0; i < Integer.MAX_VALUE; i++) {
129+
add.appendOrdInMv(0, 0);
130+
assertThat(add.added(), equalTo((long) i + 1));
131+
if (i % 5 == 0) {
132+
assertThat(counter.count, equalTo(i / 5));
133+
}
134+
if (i % 10_000_000 == 0) {
135+
logger.info(String.format(Locale.ROOT, "Progress: %02.0f%%", 100 * ((double) i / Integer.MAX_VALUE)));
136+
}
137+
}
138+
add.finishMv();
139+
add.appendOrdInMv(1, 0);
140+
assertThat(add.added(), equalTo(Integer.MAX_VALUE + 1L));
141+
add.appendOrdInMv(1, 0);
142+
assertThat(add.added(), equalTo(Integer.MAX_VALUE + 2L));
143+
add.finishMv();
144+
add.flushRemaining();
145+
assertThat(counter.count, equalTo(Integer.MAX_VALUE / 5 + 1));
146+
}
147+
}
148+
112149
@After
113150
public void breakerClear() {
114151
assertThat(blockFactory.breaker().getUsed(), equalTo(0L));
@@ -151,4 +188,21 @@ public void close() {
151188
closed = true;
152189
}
153190
}
191+
192+
private class CountingAddInput implements GroupingAggregatorFunction.AddInput {
193+
private int count;
194+
195+
@Override
196+
public void add(int positionOffset, IntBlock groupIds) {
197+
count++;
198+
}
199+
200+
@Override
201+
public void add(int positionOffset, IntVector groupIds) {
202+
count++;
203+
}
204+
205+
@Override
206+
public void close() {}
207+
}
154208
}

0 commit comments

Comments
 (0)