Skip to content

Commit c1d9e7e

Browse files
authored
ESQL: Tighten assertion on Block (#112367)
This tightens the invariant on `Block` - namely that there are no `0` length positions, at least none tracked by the `firstValueIndexes` in `ArrayBlock` - instead, the only way a position can be `0` length is to have been created with `appendNull`. The only cause of these `0` length positions was ordinals blocks made by `BlockHash`. This reworks that infrastructure to instead create those with `appendNull`.
1 parent b449f21 commit c1d9e7e

File tree

9 files changed

+325
-117
lines changed

9 files changed

+325
-117
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/AbstractAddBlock.java

Lines changed: 0 additions & 67 deletions
This file was deleted.
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.aggregation.blockhash;
9+
10+
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
11+
import org.elasticsearch.compute.data.Block;
12+
import org.elasticsearch.compute.data.BlockFactory;
13+
import org.elasticsearch.compute.data.IntBlock;
14+
import org.elasticsearch.compute.data.Page;
15+
import org.elasticsearch.core.Releasable;
16+
17+
/**
18+
* Helper for adding a {@link Page} worth of {@link Block}s to a {@link BlockHash}
19+
* while flushing the ordinals to the aggregations when we've accumulated
20+
* {@link #emitBatchSize} ordinals. See {@link #appendOrdSv} and {@link #appendOrdInMv}
21+
* for how to add values to it. After adding all values, call {@link #emitOrds} to
22+
* flush the last batch of values to the aggs.
23+
*/
24+
public class AddBlock implements Releasable {
25+
private final BlockFactory blockFactory;
26+
private final int emitBatchSize;
27+
private final GroupingAggregatorFunction.AddInput addInput;
28+
29+
private int positionOffset = 0;
30+
private int added = 0;
31+
private IntBlock.Builder ords;
32+
/**
33+
* State of the current position.
34+
* <ul>
35+
* <li>If {@code -1} then this position is "empty". It hasn't
36+
* received any calls to {@link #appendOrdInMv}. When
37+
* {@link #appendOrdInMv} is called this will shift into the
38+
* "buffering" state by setting this to the provided ord.</li>
39+
* <li>If {@code >= 0} this position is "buffering" a single
40+
* ordinal. When {@link #appendOrdInMv} is called this will
41+
* {@link Block.Builder#beginPositionEntry() begin} a multivalued
42+
* field, add the buffered ordinal, add the provided ordinal,
43+
* and shift to {@code -2}.</li>
44+
* <li>If {@code -2} then this position is "streaming" and
45+
* calling {@link #appendOrdInMv} will add values immediately.</li>
46+
* </ul>
47+
* There's some extra complexity around emitting buffered values and shifting
48+
* back into {@code -1}, but that's the gist of the states.
49+
*/
50+
private int firstOrd = -1;
51+
52+
public AddBlock(BlockFactory blockFactory, int emitBatchSize, GroupingAggregatorFunction.AddInput addInput) {
53+
this.blockFactory = blockFactory;
54+
this.emitBatchSize = emitBatchSize;
55+
this.addInput = addInput;
56+
57+
this.ords = blockFactory.newIntBlockBuilder(emitBatchSize);
58+
}
59+
60+
/**
61+
* Append a single valued ordinal. This will flush the ordinals to the aggs
62+
* if we've added {@link #emitBatchSize}.
63+
*/
64+
protected final void appendOrdSv(int position, int ord) {
65+
assert firstOrd == -1 : "currently in a multivalue position";
66+
ords.appendInt(ord);
67+
if (++added % emitBatchSize == 0) {
68+
rollover(position + 1);
69+
}
70+
}
71+
72+
/**
73+
* Append a {@code null} valued ordinal. This will flush the ordinals
74+
* to the aggs if we've added {@link #emitBatchSize}.
75+
* @deprecated nulls should resolve to some value.
76+
*/
77+
@Deprecated
78+
protected final void appendNullSv(int position) {
79+
ords.appendNull();
80+
if (++added % emitBatchSize == 0) {
81+
rollover(position + 1);
82+
}
83+
}
84+
85+
/**
86+
* Append a value inside a multivalued ordinal. If the current position is
87+
* not started this will begin the position. This will flush the ordinals to
88+
* the aggs if we've added {@link #emitBatchSize}.This should be used by like:
89+
* <pre>{@code
90+
* appendOrdInMv(position, ord);
91+
* appendOrdInMv(position, ord);
92+
* appendOrdInMv(position, ord);
93+
* finishMv();
94+
* }</pre>
95+
*/
96+
protected final void appendOrdInMv(int position, int ord) {
97+
if (++added % emitBatchSize == 0) {
98+
switch (firstOrd) {
99+
case -1 -> ords.appendInt(ord);
100+
case -2 -> {
101+
ords.appendInt(ord);
102+
ords.endPositionEntry();
103+
}
104+
default -> {
105+
assert firstOrd >= 0;
106+
ords.beginPositionEntry();
107+
ords.appendInt(firstOrd);
108+
ords.appendInt(ord);
109+
ords.endPositionEntry();
110+
}
111+
}
112+
rollover(position);
113+
firstOrd = -1;
114+
return;
115+
}
116+
switch (firstOrd) {
117+
case -1 -> firstOrd = ord;
118+
case -2 -> ords.appendInt(ord);
119+
default -> {
120+
assert firstOrd >= 0;
121+
ords.beginPositionEntry();
122+
ords.appendInt(firstOrd);
123+
ords.appendInt(ord);
124+
firstOrd = -2;
125+
}
126+
}
127+
}
128+
129+
protected final void finishMv() {
130+
switch (firstOrd) {
131+
case -1 -> ords.appendNull();
132+
case -2 -> ords.endPositionEntry();
133+
default -> ords.appendInt(firstOrd);
134+
}
135+
firstOrd = -1;
136+
}
137+
138+
protected final void emitOrds() {
139+
try (IntBlock ordsBlock = ords.build()) {
140+
addInput.add(positionOffset, ordsBlock);
141+
}
142+
}
143+
144+
private void rollover(int position) {
145+
emitOrds();
146+
positionOffset = position;
147+
ords = blockFactory.newIntBlockBuilder(emitBatchSize); // TODO add a clear method to the builder?
148+
}
149+
150+
@Override
151+
public void close() {
152+
ords.close();
153+
}
154+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BytesRef3BlockHash.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private void addVectors(BytesRefVector v1, BytesRefVector v2, BytesRefVector v3,
9898
}
9999
}
100100

101-
private class AddWork extends AbstractAddBlock {
101+
private class AddWork extends AddBlock {
102102
final IntBlock b1;
103103
final IntBlock b2;
104104
final IntBlock b3;
@@ -121,24 +121,21 @@ void add() {
121121
int first3 = b3.getFirstValueIndex(i);
122122
if (v1 == 1 && v2 == 1 && v3 == 1) {
123123
long ord = hashOrdToGroup(finalHash.add(b1.getInt(first1), b2.getInt(first2), b3.getInt(first3)));
124-
ords.appendInt(Math.toIntExact(ord));
125-
addedValue(i);
124+
appendOrdSv(i, Math.toIntExact(ord));
126125
continue;
127126
}
128-
ords.beginPositionEntry();
129127
for (int i1 = 0; i1 < v1; i1++) {
130128
int k1 = b1.getInt(first1 + i1);
131129
for (int i2 = 0; i2 < v2; i2++) {
132130
int k2 = b2.getInt(first2 + i2);
133131
for (int i3 = 0; i3 < v3; i3++) {
134132
int k3 = b3.getInt(first3 + i3);
135133
long ord = hashOrdToGroup(finalHash.add(k1, k2, k3));
136-
ords.appendInt(Math.toIntExact(ord));
137-
addedValueInMultivaluePosition(i);
134+
appendOrdInMv(i, Math.toIntExact(ord));
138135
}
139136
}
140137
}
141-
ords.endPositionEntry();
138+
finishMv();
142139
}
143140
emitOrds();
144141
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void close() {
117117
}
118118
}
119119

120-
class AddWork extends AbstractAddBlock {
120+
class AddWork extends AddBlock {
121121
final Group[] groups;
122122
final int positionCount;
123123
int position;
@@ -147,23 +147,17 @@ void add() {
147147

148148
private void addSingleEntry() {
149149
fillBytesSv(groups);
150-
ords.appendInt(Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))));
151-
addedValue(position);
150+
appendOrdSv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))));
152151
}
153152

154153
private void addMultipleEntries() {
155-
ords.beginPositionEntry();
156154
int g = 0;
157155
do {
158156
fillBytesMv(groups, g);
159-
160-
// emit ords
161-
ords.appendInt(Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))));
162-
addedValueInMultivaluePosition(position);
163-
157+
appendOrdInMv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))));
164158
g = rewindKeys(groups);
165159
} while (g >= 0);
166-
ords.endPositionEntry();
160+
finishMv();
167161
for (Group group : groups) {
168162
group.valueOffset += group.valueCount;
169163
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractArrayBlock.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private boolean assertInvariants() {
7777
if (firstValueIndexes != null) {
7878
assert firstValueIndexes.length >= getPositionCount() + 1 : firstValueIndexes.length + " < " + positionCount;
7979
for (int i = 0; i < getPositionCount(); i++) {
80-
assert firstValueIndexes[i + 1] >= firstValueIndexes[i] : firstValueIndexes[i + 1] + " < " + firstValueIndexes[i];
80+
assert firstValueIndexes[i + 1] > firstValueIndexes[i] : firstValueIndexes[i + 1] + " <= " + firstValueIndexes[i];
8181
}
8282
}
8383
if (nullsMask != null) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractBlockBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public AbstractBlockBuilder beginPositionEntry() {
7979
}
8080

8181
public AbstractBlockBuilder endPositionEntry() {
82+
assert valueCount > firstValueIndexes[positionCount] : "use appendNull to build an empty position";
8283
positionCount++;
8384
positionEntryIsOpen = false;
8485
if (hasMultiValues == false && valueCount != positionCount) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/mvdedupe/IntLongBlockAdd.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@
99

1010
import org.elasticsearch.common.util.LongLongHash;
1111
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
12-
import org.elasticsearch.compute.aggregation.blockhash.AbstractAddBlock;
12+
import org.elasticsearch.compute.aggregation.blockhash.AddBlock;
1313
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
1414
import org.elasticsearch.compute.data.BlockFactory;
1515
import org.elasticsearch.compute.data.IntBlock;
1616
import org.elasticsearch.compute.data.LongBlock;
1717

18-
public class IntLongBlockAdd extends AbstractAddBlock {
18+
public class IntLongBlockAdd extends AddBlock {
1919
private final LongLongHash hash;
2020
private final MultivalueDedupeInt block1;
2121
private final MultivalueDedupeLong block2;
@@ -47,8 +47,7 @@ private void add1(int position) {
4747
int count = block1.block.getValueCount(position);
4848
switch (count) {
4949
case 0 -> {
50-
ords.appendNull();
51-
addedValue(position);
50+
appendNullSv(position);
5251
}
5352
case 1 -> {
5453
block1.w = 1;
@@ -72,8 +71,7 @@ private void add2(int position, boolean work1IsUnique) {
7271
int count = block2.block.getValueCount(position);
7372
switch (count) {
7473
case 0 -> {
75-
ords.appendNull();
76-
addedValue(position);
74+
appendNullSv(position);
7775
}
7876
case 1 -> {
7977
block2.w = 1;
@@ -96,12 +94,10 @@ private void add2(int position, boolean work1IsUnique) {
9694
private void finishAdd(int position, boolean work1IsUnique, boolean work2IsUnique) {
9795
if (block1.w == 1) {
9896
if (block2.w == 1) {
99-
ords.appendInt(Math.toIntExact(BlockHash.hashOrdToGroup(hash.add(block1.work[0], block2.work[0]))));
100-
addedValue(position);
97+
appendOrdSv(position, Math.toIntExact(BlockHash.hashOrdToGroup(hash.add(block1.work[0], block2.work[0]))));
10198
return;
10299
}
103100
}
104-
ords.beginPositionEntry();
105101
if (work1IsUnique) {
106102
if (work2IsUnique) {
107103
finishAddUniqueUnique(position);
@@ -115,7 +111,7 @@ private void finishAdd(int position, boolean work1IsUnique, boolean work2IsUniqu
115111
finishAddSortedSorted(position);
116112
}
117113
}
118-
ords.endPositionEntry();
114+
finishMv();
119115
}
120116

121117
private void finishAddUniqueUnique(int position) {
@@ -156,22 +152,19 @@ private void finishAddSortedSorted(int position) {
156152

157153
private void finishAddUnique(int position, int v1) {
158154
for (int i = 0; i < block2.w; i++) {
159-
ords.appendInt(Math.toIntExact(BlockHash.hashOrdToGroup(hash.add(v1, block2.work[i]))));
160-
addedValueInMultivaluePosition(position);
155+
appendOrdInMv(position, Math.toIntExact(BlockHash.hashOrdToGroup(hash.add(v1, block2.work[i]))));
161156
}
162157
}
163158

164159
private void finishAddSorted(int position, int v1) {
165160
long prev2 = block2.work[0];
166-
ords.appendInt(Math.toIntExact(BlockHash.hashOrdToGroup(hash.add(v1, prev2))));
167-
addedValueInMultivaluePosition(position);
161+
appendOrdInMv(position, Math.toIntExact(BlockHash.hashOrdToGroup(hash.add(v1, prev2))));
168162
for (int i = 1; i < block2.w; i++) {
169163
if (prev2 == block2.work[i]) {
170164
continue;
171165
}
172166
prev2 = block2.work[i];
173-
ords.appendInt(Math.toIntExact(BlockHash.hashOrdToGroup(hash.add(v1, prev2))));
174-
addedValueInMultivaluePosition(position);
167+
appendOrdInMv(position, Math.toIntExact(BlockHash.hashOrdToGroup(hash.add(v1, prev2))));
175168
}
176169
}
177170
}

0 commit comments

Comments
 (0)