Skip to content

Commit 6859b11

Browse files
authored
Emit ordinals for BytesRefLongBlockHash (#136704)
This change tries to emit ordinals in BytesRefLongBlockHash when possible.
1 parent edb97d3 commit 6859b11

File tree

3 files changed

+95
-19
lines changed

3 files changed

+95
-19
lines changed

server/src/main/java/org/elasticsearch/common/util/BytesRefArray.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.core.Releasables;
2121

2222
import java.io.IOException;
23+
import java.io.UncheckedIOException;
2324

2425
/**
2526
* Compact serializable container for ByteRefs
@@ -136,6 +137,40 @@ public static BytesRefArray takeOwnershipOf(BytesRefArray other) {
136137
return b;
137138
}
138139

140+
/**
141+
* Creates a deep copy of the given BytesRefArray.
142+
*/
143+
public static BytesRefArray deepCopy(BytesRefArray other) {
144+
LongArray startOffsets = null;
145+
ByteArray bytes = null;
146+
BytesRefArray result = null;
147+
try {
148+
startOffsets = other.bigArrays.newLongArray(other.startOffsets.size());
149+
for (long i = 0; i < other.startOffsets.size(); i++) {
150+
startOffsets.set(i, other.startOffsets.get(i));
151+
}
152+
bytes = other.bigArrays.newByteArray(other.bytes.size());
153+
BytesRefIterator it = other.bytes.iterator();
154+
BytesRef ref;
155+
long pos = 0;
156+
try {
157+
while ((ref = it.next()) != null) {
158+
bytes.set(pos, ref.bytes, ref.offset, ref.length);
159+
pos += ref.length;
160+
}
161+
} catch (IOException e) {
162+
assert false : new AssertionError("BytesRefIterator should not throw IOException", e);
163+
throw new UncheckedIOException(e);
164+
}
165+
result = new BytesRefArray(startOffsets, bytes, other.size, other.bigArrays);
166+
return result;
167+
} finally {
168+
if (result == null) {
169+
Releasables.closeExpectNoException(startOffsets, bytes);
170+
}
171+
}
172+
}
173+
139174
@Override
140175
public void writeTo(StreamOutput out) throws IOException {
141176
assert startOffsets != null : "using BytesRefArray after ownership taken";

server/src/test/java/org/elasticsearch/common/util/BytesRefArrayTests.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,17 @@ public void testReadWrittenRepeated(boolean halfEmpty, int listSize) {
164164
testReadWritten(IntStream.range(0, listSize).mapToObj(i -> values).flatMap(List::stream).toList(), 10);
165165
}
166166

167+
public void testDeepCopy() {
168+
try (
169+
BytesRefArray bytes1 = randomArray(between(1, 50_000), between(10, 50_000), mockBigArrays());
170+
BytesRefArray bytes2 = BytesRefArray.deepCopy(bytes1);
171+
BytesRefArray bytes3 = BytesRefArray.deepCopy(bytes2)
172+
) {
173+
assertEquality(bytes1, bytes2);
174+
assertEquality(bytes1, bytes3);
175+
}
176+
}
177+
167178
private void testReadWritten(List<BytesRef> values, int initialCapacity) {
168179
try (BytesRefArray array = new BytesRefArray(initialCapacity, mockBigArrays())) {
169180
for (BytesRef v : values) {
@@ -191,7 +202,7 @@ private void assertEquality(BytesRefArray original, BytesRefArray copy) {
191202
for (int i = 0; i < original.size(); ++i) {
192203
original.get(i, scratch);
193204
copy.get(i, scratch2);
194-
assertEquals(scratch, scratch2);
205+
assertEquals(Integer.toString(i), scratch, scratch2);
195206
}
196207
}
197208
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BytesRefLongBlockHash.java

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.common.unit.ByteSizeValue;
1212
import org.elasticsearch.common.util.BigArrays;
1313
import org.elasticsearch.common.util.BitArray;
14+
import org.elasticsearch.common.util.BytesRefArray;
1415
import org.elasticsearch.common.util.LongLongHash;
1516
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
1617
import org.elasticsearch.compute.aggregation.SeenGroupIds;
@@ -22,6 +23,7 @@
2223
import org.elasticsearch.compute.data.IntVector;
2324
import org.elasticsearch.compute.data.LongBlock;
2425
import org.elasticsearch.compute.data.LongVector;
26+
import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
2527
import org.elasticsearch.compute.data.Page;
2628
import org.elasticsearch.compute.operator.mvdedupe.IntLongBlockAdd;
2729
import org.elasticsearch.core.ReleasableIterator;
@@ -143,28 +145,56 @@ public ReleasableIterator<IntBlock> lookup(Page page, ByteSizeValue targetBlockS
143145

144146
@Override
145147
public Block[] getKeys() {
146-
int positions = (int) finalHash.size();
147148
BytesRefBlock k1 = null;
148149
LongVector k2 = null;
149-
try (
150-
BytesRefBlock.Builder keys1 = blockFactory.newBytesRefBlockBuilder(positions);
151-
LongVector.Builder keys2 = blockFactory.newLongVectorBuilder(positions)
152-
) {
153-
BytesRef scratch = new BytesRef();
154-
for (long i = 0; i < positions; i++) {
155-
keys2.appendLong(finalHash.getKey2(i));
156-
long h1 = finalHash.getKey1(i);
157-
if (h1 == 0) {
158-
keys1.appendNull();
159-
} else {
160-
keys1.appendBytesRef(bytesHash.hash.get(h1 - 1, scratch));
150+
int positions = (int) finalHash.size();
151+
if (OrdinalBytesRefBlock.isDense(positions, bytesHash.hash.size())) {
152+
try (var ordinals = blockFactory.newIntBlockBuilder(positions); var longs = blockFactory.newLongVectorBuilder(positions)) {
153+
for (long p = 0; p < positions; p++) {
154+
long h1 = finalHash.getKey1(p);
155+
if (h1 == 0) {
156+
ordinals.appendNull();
157+
} else {
158+
ordinals.appendInt(Math.toIntExact(h1 - 1));
159+
}
160+
longs.appendLong(finalHash.getKey2(p));
161+
}
162+
// TODO: make takeOwnershipOf work?
163+
BytesRefArray bytes = BytesRefArray.deepCopy(bytesHash.hash.getBytesRefs());
164+
try {
165+
var dict = blockFactory.newBytesRefArrayVector(bytes, Math.toIntExact(bytes.size()));
166+
bytes = null; // transfer ownership to dict
167+
k1 = new OrdinalBytesRefBlock(ordinals.build(), dict);
168+
} finally {
169+
Releasables.closeExpectNoException(bytes);
170+
}
171+
k2 = longs.build();
172+
} finally {
173+
if (k2 == null) {
174+
Releasables.closeExpectNoException(k1);
161175
}
162176
}
163-
k1 = keys1.build();
164-
k2 = keys2.build();
165-
} finally {
166-
if (k2 == null) {
167-
Releasables.closeExpectNoException(k1);
177+
} else {
178+
try (
179+
BytesRefBlock.Builder keys1 = blockFactory.newBytesRefBlockBuilder(positions);
180+
LongVector.Builder keys2 = blockFactory.newLongVectorBuilder(positions)
181+
) {
182+
BytesRef scratch = new BytesRef();
183+
for (long i = 0; i < positions; i++) {
184+
long h1 = finalHash.getKey1(i);
185+
if (h1 == 0) {
186+
keys1.appendNull();
187+
} else {
188+
keys1.appendBytesRef(bytesHash.hash.get(h1 - 1, scratch));
189+
}
190+
keys2.appendLong(finalHash.getKey2(i));
191+
}
192+
k1 = keys1.build();
193+
k2 = keys2.build();
194+
} finally {
195+
if (k2 == null) {
196+
Releasables.closeExpectNoException(k1);
197+
}
168198
}
169199
}
170200
if (reverseOutput) {

0 commit comments

Comments
 (0)