Skip to content

Commit 1bf6e69

Browse files
ES|QL: Fix BytesRef2BlockHash (elastic#130705) (elastic#130907)
1 parent bb8a991 commit 1bf6e69

File tree

3 files changed

+180
-2
lines changed

3 files changed

+180
-2
lines changed

docs/changelog/130705.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 130705
2+
summary: Fix `BytesRef2BlockHash`
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BytesRef2BlockHash.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,9 @@ public Block[] getKeys() {
145145
try {
146146
try (BytesRefBlock.Builder b1 = blockFactory.newBytesRefBlockBuilder(positions)) {
147147
for (int i = 0; i < positions; i++) {
148-
int k1 = (int) (finalHash.get(i) & 0xffffL);
148+
int k1 = (int) (finalHash.get(i) & 0xffffffffL);
149+
// k1 is always positive, it's how hash values are generated, see BytesRefBlockHash.
150+
// For now, we only manage at most 2^31 hash entries
149151
if (k1 == 0) {
150152
b1.appendNull();
151153
} else {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java

Lines changed: 172 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
1212

1313
import org.apache.lucene.util.BytesRef;
14+
import org.elasticsearch.common.lucene.BytesRefs;
1415
import org.elasticsearch.common.unit.ByteSizeValue;
1516
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
1617
import org.elasticsearch.compute.data.Block;
@@ -1225,6 +1226,176 @@ public void testLongNull() {
12251226
}, blockFactory.newLongArrayVector(values, values.length).asBlock(), blockFactory.newConstantNullBlock(values.length));
12261227
}
12271228

1229+
public void test2BytesRefsHighCardinalityKey() {
1230+
final Page page;
1231+
int positions1 = 10;
1232+
int positions2 = 100_000;
1233+
if (randomBoolean()) {
1234+
positions1 = 100_000;
1235+
positions2 = 10;
1236+
}
1237+
final int totalPositions = positions1 * positions2;
1238+
try (
1239+
BytesRefBlock.Builder builder1 = blockFactory.newBytesRefBlockBuilder(totalPositions);
1240+
BytesRefBlock.Builder builder2 = blockFactory.newBytesRefBlockBuilder(totalPositions);
1241+
) {
1242+
for (int i = 0; i < positions1; i++) {
1243+
for (int p = 0; p < positions2; p++) {
1244+
builder1.appendBytesRef(new BytesRef("abcdef" + i));
1245+
builder2.appendBytesRef(new BytesRef("abcdef" + p));
1246+
}
1247+
}
1248+
page = new Page(builder1.build(), builder2.build());
1249+
}
1250+
record Output(int offset, IntBlock block, IntVector vector) implements Releasable {
1251+
@Override
1252+
public void close() {
1253+
Releasables.close(block, vector);
1254+
}
1255+
}
1256+
List<Output> output = new ArrayList<>();
1257+
1258+
try (BlockHash hash1 = new BytesRef2BlockHash(blockFactory, 0, 1, totalPositions);) {
1259+
hash1.add(page, new GroupingAggregatorFunction.AddInput() {
1260+
@Override
1261+
public void add(int positionOffset, IntBlock groupIds) {
1262+
groupIds.incRef();
1263+
output.add(new Output(positionOffset, groupIds, null));
1264+
}
1265+
1266+
@Override
1267+
public void add(int positionOffset, IntVector groupIds) {
1268+
groupIds.incRef();
1269+
output.add(new Output(positionOffset, null, groupIds));
1270+
}
1271+
1272+
@Override
1273+
public void close() {
1274+
fail("hashes should not close AddInput");
1275+
}
1276+
});
1277+
1278+
Block[] keys = hash1.getKeys();
1279+
try {
1280+
Set<String> distinctKeys = new HashSet<>();
1281+
BytesRefBlock block0 = (BytesRefBlock) keys[0];
1282+
BytesRefBlock block1 = (BytesRefBlock) keys[1];
1283+
BytesRef scratch = new BytesRef();
1284+
StringBuilder builder = new StringBuilder();
1285+
for (int i = 0; i < totalPositions; i++) {
1286+
builder.setLength(0);
1287+
builder.append(BytesRefs.toString(block0.getBytesRef(i, scratch)));
1288+
builder.append("#");
1289+
builder.append(BytesRefs.toString(block1.getBytesRef(i, scratch)));
1290+
distinctKeys.add(builder.toString());
1291+
}
1292+
assertThat(distinctKeys.size(), equalTo(totalPositions));
1293+
} finally {
1294+
Releasables.close(keys);
1295+
}
1296+
} finally {
1297+
Releasables.close(output);
1298+
page.releaseBlocks();
1299+
}
1300+
}
1301+
1302+
public void test2BytesRefs() {
1303+
final Page page;
1304+
final int positions = randomIntBetween(1, 1000);
1305+
final boolean generateVector = randomBoolean();
1306+
try (
1307+
BytesRefBlock.Builder builder1 = blockFactory.newBytesRefBlockBuilder(positions);
1308+
BytesRefBlock.Builder builder2 = blockFactory.newBytesRefBlockBuilder(positions);
1309+
) {
1310+
List<BytesRefBlock.Builder> builders = List.of(builder1, builder2);
1311+
for (int p = 0; p < positions; p++) {
1312+
for (BytesRefBlock.Builder builder : builders) {
1313+
int valueCount = generateVector ? 1 : between(0, 3);
1314+
switch (valueCount) {
1315+
case 0 -> builder.appendNull();
1316+
case 1 -> builder.appendBytesRef(new BytesRef(Integer.toString(between(1, 100))));
1317+
default -> {
1318+
builder.beginPositionEntry();
1319+
for (int v = 0; v < valueCount; v++) {
1320+
builder.appendBytesRef(new BytesRef(Integer.toString(between(1, 100))));
1321+
}
1322+
builder.endPositionEntry();
1323+
}
1324+
}
1325+
}
1326+
}
1327+
page = new Page(builder1.build(), builder2.build());
1328+
}
1329+
final int emitBatchSize = between(positions, 10 * 1024);
1330+
var groupSpecs = List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF), new BlockHash.GroupSpec(1, ElementType.BYTES_REF));
1331+
record Output(int offset, IntBlock block, IntVector vector) implements Releasable {
1332+
@Override
1333+
public void close() {
1334+
Releasables.close(block, vector);
1335+
}
1336+
}
1337+
List<Output> output1 = new ArrayList<>();
1338+
List<Output> output2 = new ArrayList<>();
1339+
try (
1340+
BlockHash hash1 = new BytesRef2BlockHash(blockFactory, 0, 1, emitBatchSize);
1341+
BlockHash hash2 = new PackedValuesBlockHash(groupSpecs, blockFactory, emitBatchSize)
1342+
) {
1343+
hash1.add(page, new GroupingAggregatorFunction.AddInput() {
1344+
@Override
1345+
public void add(int positionOffset, IntBlock groupIds) {
1346+
groupIds.incRef();
1347+
output1.add(new Output(positionOffset, groupIds, null));
1348+
}
1349+
1350+
@Override
1351+
public void add(int positionOffset, IntVector groupIds) {
1352+
groupIds.incRef();
1353+
output1.add(new Output(positionOffset, null, groupIds));
1354+
}
1355+
1356+
@Override
1357+
public void close() {
1358+
fail("hashes should not close AddInput");
1359+
}
1360+
});
1361+
hash2.add(page, new GroupingAggregatorFunction.AddInput() {
1362+
@Override
1363+
public void add(int positionOffset, IntBlock groupIds) {
1364+
groupIds.incRef();
1365+
output2.add(new Output(positionOffset, groupIds, null));
1366+
}
1367+
1368+
@Override
1369+
public void add(int positionOffset, IntVector groupIds) {
1370+
groupIds.incRef();
1371+
output2.add(new Output(positionOffset, null, groupIds));
1372+
}
1373+
1374+
@Override
1375+
public void close() {
1376+
fail("hashes should not close AddInput");
1377+
}
1378+
});
1379+
assertThat(output1.size(), equalTo(output2.size()));
1380+
for (int i = 0; i < output1.size(); i++) {
1381+
Output o1 = output1.get(i);
1382+
Output o2 = output2.get(i);
1383+
assertThat(o1.offset, equalTo(o2.offset));
1384+
if (o1.vector != null) {
1385+
assertNull(o1.block);
1386+
assertThat(o1.vector, equalTo(o2.vector != null ? o2.vector : o2.block.asVector()));
1387+
} else {
1388+
assertNull(o2.vector);
1389+
assertThat(o1.block, equalTo(o2.block));
1390+
}
1391+
}
1392+
} finally {
1393+
Releasables.close(output1);
1394+
Releasables.close(output2);
1395+
page.releaseBlocks();
1396+
}
1397+
}
1398+
12281399
public void test3BytesRefs() {
12291400
final Page page;
12301401
final int positions = randomIntBetween(1, 1000);
@@ -1307,7 +1478,7 @@ public void close() {
13071478
fail("hashes should not close AddInput");
13081479
}
13091480
});
1310-
assertThat(output1.size(), equalTo(output1.size()));
1481+
assertThat(output1.size(), equalTo(output2.size()));
13111482
for (int i = 0; i < output1.size(); i++) {
13121483
Output o1 = output1.get(i);
13131484
Output o2 = output2.get(i);

0 commit comments

Comments
 (0)