Skip to content

Commit 6ca03ee

Browse files
ES|QL: Fix BytesRef2BlockHash (#130705) (#130918)
1 parent 0c923e2 commit 6ca03ee

File tree

3 files changed

+180
-2
lines changed

3 files changed

+180
-2
lines changed

docs/changelog/130705.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 130705
2+
summary: Fix `BytesRef2BlockHash`
3+
area: ES|QL
4+
type: bug
5+
issues: []

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BytesRef2BlockHash.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,9 @@ public Block[] getKeys() {
145145
try {
146146
try (BytesRefBlock.Builder b1 = blockFactory.newBytesRefBlockBuilder(positions)) {
147147
for (int i = 0; i < positions; i++) {
148-
int k1 = (int) (finalHash.get(i) & 0xffffL);
148+
int k1 = (int) (finalHash.get(i) & 0xffffffffL);
149+
// k1 is always positive, it's how hash values are generated, see BytesRefBlockHash.
150+
// For now, we only manage at most 2^31 hash entries
149151
if (k1 == 0) {
150152
b1.appendNull();
151153
} else {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java

Lines changed: 172 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import org.apache.lucene.util.BytesRef;
1414
import org.elasticsearch.common.breaker.CircuitBreaker;
15+
import org.elasticsearch.common.lucene.BytesRefs;
1516
import org.elasticsearch.common.unit.ByteSizeValue;
1617
import org.elasticsearch.common.util.BigArrays;
1718
import org.elasticsearch.common.util.MockBigArrays;
@@ -1238,6 +1239,176 @@ public void testLongNull() {
12381239
}, blockFactory.newLongArrayVector(values, values.length).asBlock(), blockFactory.newConstantNullBlock(values.length));
12391240
}
12401241

1242+
public void test2BytesRefsHighCardinalityKey() {
1243+
final Page page;
1244+
int positions1 = 10;
1245+
int positions2 = 100_000;
1246+
if (randomBoolean()) {
1247+
positions1 = 100_000;
1248+
positions2 = 10;
1249+
}
1250+
final int totalPositions = positions1 * positions2;
1251+
try (
1252+
BytesRefBlock.Builder builder1 = blockFactory.newBytesRefBlockBuilder(totalPositions);
1253+
BytesRefBlock.Builder builder2 = blockFactory.newBytesRefBlockBuilder(totalPositions);
1254+
) {
1255+
for (int i = 0; i < positions1; i++) {
1256+
for (int p = 0; p < positions2; p++) {
1257+
builder1.appendBytesRef(new BytesRef("abcdef" + i));
1258+
builder2.appendBytesRef(new BytesRef("abcdef" + p));
1259+
}
1260+
}
1261+
page = new Page(builder1.build(), builder2.build());
1262+
}
1263+
record Output(int offset, IntBlock block, IntVector vector) implements Releasable {
1264+
@Override
1265+
public void close() {
1266+
Releasables.close(block, vector);
1267+
}
1268+
}
1269+
List<Output> output = new ArrayList<>();
1270+
1271+
try (BlockHash hash1 = new BytesRef2BlockHash(blockFactory, 0, 1, totalPositions);) {
1272+
hash1.add(page, new GroupingAggregatorFunction.AddInput() {
1273+
@Override
1274+
public void add(int positionOffset, IntBlock groupIds) {
1275+
groupIds.incRef();
1276+
output.add(new Output(positionOffset, groupIds, null));
1277+
}
1278+
1279+
@Override
1280+
public void add(int positionOffset, IntVector groupIds) {
1281+
groupIds.incRef();
1282+
output.add(new Output(positionOffset, null, groupIds));
1283+
}
1284+
1285+
@Override
1286+
public void close() {
1287+
fail("hashes should not close AddInput");
1288+
}
1289+
});
1290+
1291+
Block[] keys = hash1.getKeys();
1292+
try {
1293+
Set<String> distinctKeys = new HashSet<>();
1294+
BytesRefBlock block0 = (BytesRefBlock) keys[0];
1295+
BytesRefBlock block1 = (BytesRefBlock) keys[1];
1296+
BytesRef scratch = new BytesRef();
1297+
StringBuilder builder = new StringBuilder();
1298+
for (int i = 0; i < totalPositions; i++) {
1299+
builder.setLength(0);
1300+
builder.append(BytesRefs.toString(block0.getBytesRef(i, scratch)));
1301+
builder.append("#");
1302+
builder.append(BytesRefs.toString(block1.getBytesRef(i, scratch)));
1303+
distinctKeys.add(builder.toString());
1304+
}
1305+
assertThat(distinctKeys.size(), equalTo(totalPositions));
1306+
} finally {
1307+
Releasables.close(keys);
1308+
}
1309+
} finally {
1310+
Releasables.close(output);
1311+
page.releaseBlocks();
1312+
}
1313+
}
1314+
1315+
public void test2BytesRefs() {
1316+
final Page page;
1317+
final int positions = randomIntBetween(1, 1000);
1318+
final boolean generateVector = randomBoolean();
1319+
try (
1320+
BytesRefBlock.Builder builder1 = blockFactory.newBytesRefBlockBuilder(positions);
1321+
BytesRefBlock.Builder builder2 = blockFactory.newBytesRefBlockBuilder(positions);
1322+
) {
1323+
List<BytesRefBlock.Builder> builders = List.of(builder1, builder2);
1324+
for (int p = 0; p < positions; p++) {
1325+
for (BytesRefBlock.Builder builder : builders) {
1326+
int valueCount = generateVector ? 1 : between(0, 3);
1327+
switch (valueCount) {
1328+
case 0 -> builder.appendNull();
1329+
case 1 -> builder.appendBytesRef(new BytesRef(Integer.toString(between(1, 100))));
1330+
default -> {
1331+
builder.beginPositionEntry();
1332+
for (int v = 0; v < valueCount; v++) {
1333+
builder.appendBytesRef(new BytesRef(Integer.toString(between(1, 100))));
1334+
}
1335+
builder.endPositionEntry();
1336+
}
1337+
}
1338+
}
1339+
}
1340+
page = new Page(builder1.build(), builder2.build());
1341+
}
1342+
final int emitBatchSize = between(positions, 10 * 1024);
1343+
var groupSpecs = List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF), new BlockHash.GroupSpec(1, ElementType.BYTES_REF));
1344+
record Output(int offset, IntBlock block, IntVector vector) implements Releasable {
1345+
@Override
1346+
public void close() {
1347+
Releasables.close(block, vector);
1348+
}
1349+
}
1350+
List<Output> output1 = new ArrayList<>();
1351+
List<Output> output2 = new ArrayList<>();
1352+
try (
1353+
BlockHash hash1 = new BytesRef2BlockHash(blockFactory, 0, 1, emitBatchSize);
1354+
BlockHash hash2 = new PackedValuesBlockHash(groupSpecs, blockFactory, emitBatchSize)
1355+
) {
1356+
hash1.add(page, new GroupingAggregatorFunction.AddInput() {
1357+
@Override
1358+
public void add(int positionOffset, IntBlock groupIds) {
1359+
groupIds.incRef();
1360+
output1.add(new Output(positionOffset, groupIds, null));
1361+
}
1362+
1363+
@Override
1364+
public void add(int positionOffset, IntVector groupIds) {
1365+
groupIds.incRef();
1366+
output1.add(new Output(positionOffset, null, groupIds));
1367+
}
1368+
1369+
@Override
1370+
public void close() {
1371+
fail("hashes should not close AddInput");
1372+
}
1373+
});
1374+
hash2.add(page, new GroupingAggregatorFunction.AddInput() {
1375+
@Override
1376+
public void add(int positionOffset, IntBlock groupIds) {
1377+
groupIds.incRef();
1378+
output2.add(new Output(positionOffset, groupIds, null));
1379+
}
1380+
1381+
@Override
1382+
public void add(int positionOffset, IntVector groupIds) {
1383+
groupIds.incRef();
1384+
output2.add(new Output(positionOffset, null, groupIds));
1385+
}
1386+
1387+
@Override
1388+
public void close() {
1389+
fail("hashes should not close AddInput");
1390+
}
1391+
});
1392+
assertThat(output1.size(), equalTo(output2.size()));
1393+
for (int i = 0; i < output1.size(); i++) {
1394+
Output o1 = output1.get(i);
1395+
Output o2 = output2.get(i);
1396+
assertThat(o1.offset, equalTo(o2.offset));
1397+
if (o1.vector != null) {
1398+
assertNull(o1.block);
1399+
assertThat(o1.vector, equalTo(o2.vector != null ? o2.vector : o2.block.asVector()));
1400+
} else {
1401+
assertNull(o2.vector);
1402+
assertThat(o1.block, equalTo(o2.block));
1403+
}
1404+
}
1405+
} finally {
1406+
Releasables.close(output1);
1407+
Releasables.close(output2);
1408+
page.releaseBlocks();
1409+
}
1410+
}
1411+
12411412
public void test3BytesRefs() {
12421413
final Page page;
12431414
final int positions = randomIntBetween(1, 1000);
@@ -1320,7 +1491,7 @@ public void close() {
13201491
fail("hashes should not close AddInput");
13211492
}
13221493
});
1323-
assertThat(output1.size(), equalTo(output1.size()));
1494+
assertThat(output1.size(), equalTo(output2.size()));
13241495
for (int i = 0; i < output1.size(); i++) {
13251496
Output o1 = output1.get(i);
13261497
Output o2 = output2.get(i);

0 commit comments

Comments
 (0)