Skip to content

Commit 9cfeaac

Browse files
authored
Optimize BytesArray::indexOf, which is used heavily in ndjson parsing (#135087)
This commit optimizes BytesArray::indexOf, which is used heavily in ndjson parsing. I simply extracted the indexOf functionality into ESVectorUtil, so that we could have a default implementation (the current one), and a Panama Vectorized one. BytesArray::indexOf is used heavily for newline json delimited parsing, which backs the _bulk API. Here’s the result of the benchmark run on Linux x64, Intel SkyLake: Size Baseline ops/ms Panama ops/ms Speedup 4,096 2,439 41,689 ~17.1x 16,384 624 12,297 ~19.7x 65,536 156 1,689 ~10.8x 1,048,576 9.8 73.4 ~7.5x The Panama version is 7–20× faster than the baseline depending on input size. For small and medium arrays (4 KB and 16 KB), the speedup is dramatic (~17–20×), showing that the vectorization pays off most when the data set fits easily in cache. For larger arrays (64 KB – 1 MB), the relative advantage drops (~10× at 64 KB, ~7.5× at 1 MB). That’s expected - as array size grows, the bottleneck shifts from CPU to memory bandwidth, reducing the benefit of SIMD. In short: Panama delivers an order-of-magnitude improvement over the scalar implementation, especially for small-to-medium buffers, while still offering a solid multiple (~7×) speedup even on very large buffers. The Panama implementation is between 10-6x faster on AVX2, and ~5x times faster on ARM.
1 parent d2fae99 commit 9cfeaac

File tree

10 files changed

+374
-84
lines changed

10 files changed

+374
-84
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
package org.elasticsearch.benchmark.bytes;
10+
11+
import org.elasticsearch.common.bytes.BytesArray;
12+
import org.elasticsearch.common.logging.LogConfigurator;
13+
import org.openjdk.jmh.annotations.Benchmark;
14+
import org.openjdk.jmh.annotations.BenchmarkMode;
15+
import org.openjdk.jmh.annotations.Fork;
16+
import org.openjdk.jmh.annotations.Measurement;
17+
import org.openjdk.jmh.annotations.Mode;
18+
import org.openjdk.jmh.annotations.OutputTimeUnit;
19+
import org.openjdk.jmh.annotations.Param;
20+
import org.openjdk.jmh.annotations.Scope;
21+
import org.openjdk.jmh.annotations.Setup;
22+
import org.openjdk.jmh.annotations.State;
23+
import org.openjdk.jmh.annotations.Warmup;
24+
25+
import java.util.concurrent.TimeUnit;
26+
27+
@Warmup(iterations = 4, time = 1)
28+
@Measurement(iterations = 5, time = 1)
29+
@BenchmarkMode(Mode.Throughput)
30+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
31+
@State(Scope.Thread)
32+
@Fork(value = 1)
33+
public class BytesArrayIndexOfBenchmark {
34+
35+
static {
36+
LogConfigurator.configureESLogging(); // native access requires logging to be initialized
37+
}
38+
39+
static final byte MARKER = (byte) '\n';
40+
41+
@Param(value = { "64", "127", "128", "4096", "16384", "65536", "1048576" })
42+
public int size;
43+
44+
BytesArray bytesArray;
45+
46+
@Setup
47+
public void setup() {
48+
byte[] bytes = new byte[size];
49+
bytes[bytes.length - 1] = MARKER;
50+
bytesArray = new BytesArray(bytes, 0, bytes.length);
51+
}
52+
53+
@Benchmark
54+
public int indexOf() {
55+
return bytesArray.indexOf(MARKER, 0);
56+
}
57+
58+
@Benchmark
59+
@Fork(jvmArgsPrepend = { "--add-modules=jdk.incubator.vector" })
60+
public int indexOfPanama() {
61+
return bytesArray.indexOf(MARKER, 0);
62+
}
63+
64+
@Benchmark
65+
public int withOffsetIndexOf() {
66+
return bytesArray.indexOf(MARKER, 1);
67+
}
68+
69+
@Benchmark
70+
@Fork(jvmArgsPrepend = { "--add-modules=jdk.incubator.vector" })
71+
public int withOffsetIndexPanama() {
72+
return bytesArray.indexOf(MARKER, 1);
73+
}
74+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.benchmark.bytes;
11+
12+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
13+
14+
import org.elasticsearch.test.ESTestCase;
15+
import org.openjdk.jmh.annotations.Param;
16+
17+
import java.util.Arrays;
18+
19+
public class BytesArrayIndexOfBenchmarkTests extends ESTestCase {
20+
21+
final int size;
22+
23+
public BytesArrayIndexOfBenchmarkTests(int size) {
24+
this.size = size;
25+
}
26+
27+
public void testIndexOf() {
28+
var bench = new BytesArrayIndexOfBenchmark();
29+
bench.size = size;
30+
bench.setup();
31+
assertEquals(size - 1, bench.indexOf());
32+
assertEquals(size - 1, bench.indexOfPanama());
33+
assertEquals(size - 1, bench.withOffsetIndexOf());
34+
assertEquals(size - 1, bench.withOffsetIndexPanama());
35+
}
36+
37+
@ParametersFactory
38+
public static Iterable<Object[]> parametersFactory() {
39+
try {
40+
var params = BytesArrayIndexOfBenchmark.class.getField("size").getAnnotationsByType(Param.class)[0].value();
41+
return () -> Arrays.stream(params).map(Integer::parseInt).map(i -> new Object[] { i }).iterator();
42+
} catch (NoSuchFieldException e) {
43+
throw new AssertionError(e);
44+
}
45+
}
46+
}

docs/changelog/135087.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 135087
2+
summary: "Optimize `BytesArray::indexOf,` which is used heavily in ndjson parsing"
3+
area: "Performance"
4+
type: feature
5+
issues: []

libs/simdvec/src/main/java/org/elasticsearch/simdvec/ESVectorUtil.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.lang.invoke.MethodHandle;
2020
import java.lang.invoke.MethodHandles;
2121
import java.lang.invoke.MethodType;
22+
import java.util.Objects;
2223

2324
import static org.elasticsearch.simdvec.internal.vectorization.ESVectorUtilSupport.B_QUERY;
2425

@@ -399,4 +400,22 @@ public static void transposeHalfByte(int[] q, byte[] quantQueryByte) {
399400
}
400401
IMPL.transposeHalfByte(q, quantQueryByte);
401402
}
403+
404+
/**
405+
* Searches for the first occurrence of the given marker byte in the specified range of the array.
406+
*
407+
* <p>The search starts at {@code offset} and examines at most {@code length} bytes. The return
408+
* value is the relative index of the first occurrence of {@code marker} within this slice,
409+
* or {@code -1} if not found.
410+
*
411+
* @param bytes the byte array to search
412+
* @param offset the starting index within the array
413+
* @param length the number of bytes to examine
414+
* @param marker the byte to search for
415+
* @return the relative index (0..length-1) of the first match, or {@code -1} if not found
416+
*/
417+
public static int indexOf(byte[] bytes, int offset, int length, byte marker) {
418+
Objects.checkFromIndexSize(offset, length, bytes.length);
419+
return IMPL.indexOf(bytes, offset, length, marker);
420+
}
402421
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.simdvec.internal.vectorization;
11+
12+
import static org.apache.lucene.util.BitUtil.VH_LE_LONG;
13+
14+
/** Byte array utilities. */
15+
final class ByteArrayUtils {
16+
17+
/**
18+
* Implementation of {@link ESVectorUtilSupport#indexOf(byte[], int, int, byte)} using fast
19+
* SWAR (SIMD Within A Register) loop.
20+
*/
21+
static int indexOf(final byte[] bytes, final int offset, final int len, final byte marker) {
22+
final int end = offset + len;
23+
int i = offset;
24+
25+
// First, try to find the marker in the first few bytes, so we can enter the faster 8-byte aligned loop below.
26+
// The idea for this logic is taken from Netty's io.netty.buffer.ByteBufUtil.firstIndexOf and optimized for little endian hardware.
27+
// See e.g. https://richardstartin.github.io/posts/finding-bytes for the idea behind this optimization.
28+
final int byteCount = len & 7;
29+
if (byteCount > 0) {
30+
final int index = unrolledFirstIndexOf(bytes, i, byteCount, marker);
31+
if (index != -1) {
32+
return index - offset;
33+
}
34+
i += byteCount;
35+
if (i == end) {
36+
return -1;
37+
}
38+
}
39+
final int longCount = len >>> 3;
40+
// faster SWAR (SIMD Within A Register) loop
41+
final long pattern = compilePattern(marker);
42+
for (int j = 0; j < longCount; j++) {
43+
int index = findInLong(readLongLE(bytes, i), pattern);
44+
if (index < Long.BYTES) {
45+
return i + index - offset;
46+
}
47+
i += Long.BYTES;
48+
}
49+
return -1;
50+
}
51+
52+
private static long readLongLE(byte[] arr, int offset) {
53+
return (long) VH_LE_LONG.get(arr, offset);
54+
}
55+
56+
private static long compilePattern(byte byteToFind) {
57+
return (byteToFind & 0xFFL) * 0x101010101010101L;
58+
}
59+
60+
private static int findInLong(long word, long pattern) {
61+
long input = word ^ pattern;
62+
long tmp = (input & 0x7F7F7F7F7F7F7F7FL) + 0x7F7F7F7F7F7F7F7FL;
63+
tmp = ~(tmp | input | 0x7F7F7F7F7F7F7F7FL);
64+
final int binaryPosition = Long.numberOfTrailingZeros(tmp);
65+
return binaryPosition >>> 3;
66+
}
67+
68+
private static int unrolledFirstIndexOf(byte[] buffer, int fromIndex, int byteCount, byte value) {
69+
if (buffer[fromIndex] == value) {
70+
return fromIndex;
71+
}
72+
if (byteCount == 1) {
73+
return -1;
74+
}
75+
if (buffer[fromIndex + 1] == value) {
76+
return fromIndex + 1;
77+
}
78+
if (byteCount == 2) {
79+
return -1;
80+
}
81+
if (buffer[fromIndex + 2] == value) {
82+
return fromIndex + 2;
83+
}
84+
if (byteCount == 3) {
85+
return -1;
86+
}
87+
if (buffer[fromIndex + 3] == value) {
88+
return fromIndex + 3;
89+
}
90+
if (byteCount == 4) {
91+
return -1;
92+
}
93+
if (buffer[fromIndex + 4] == value) {
94+
return fromIndex + 4;
95+
}
96+
if (byteCount == 5) {
97+
return -1;
98+
}
99+
if (buffer[fromIndex + 5] == value) {
100+
return fromIndex + 5;
101+
}
102+
if (byteCount == 6) {
103+
return -1;
104+
}
105+
if (buffer[fromIndex + 6] == value) {
106+
return fromIndex + 6;
107+
}
108+
return -1;
109+
}
110+
}

libs/simdvec/src/main/java/org/elasticsearch/simdvec/internal/vectorization/DefaultESVectorUtilSupport.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,4 +403,9 @@ public static void transposeHalfByteImpl(int[] q, byte[] quantQueryByte) {
403403
quantQueryByte[index + quantQueryByte.length / 2] = (byte) upperMiddleByte;
404404
quantQueryByte[index + 3 * quantQueryByte.length / 4] = (byte) upperByte;
405405
}
406+
407+
@Override
408+
public int indexOf(byte[] bytes, int offset, int length, byte marker) {
409+
return ByteArrayUtils.indexOf(bytes, offset, length, marker);
410+
}
406411
}

libs/simdvec/src/main/java/org/elasticsearch/simdvec/internal/vectorization/ESVectorUtilSupport.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,6 @@ void soarDistanceBulk(
6767
void packAsBinary(int[] vector, byte[] packed);
6868

6969
void transposeHalfByte(int[] q, byte[] quantQueryByte);
70+
71+
int indexOf(byte[] bytes, int offset, int length, byte marker);
7072
}

libs/simdvec/src/main21/java/org/elasticsearch/simdvec/internal/vectorization/PanamaESVectorUtilSupport.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,4 +1122,26 @@ private void transposeHalfByte128(int[] q, byte[] quantQueryByte) {
11221122
quantQueryByte[index + quantQueryByte.length / 2] = (byte) upperMiddleByte;
11231123
quantQueryByte[index + 3 * quantQueryByte.length / 4] = (byte) upperByte;
11241124
}
1125+
1126+
@Override
1127+
public int indexOf(final byte[] bytes, final int offset, final int length, final byte marker) {
1128+
final ByteVector markerVector = ByteVector.broadcast(ByteVector.SPECIES_PREFERRED, marker);
1129+
final int loopBound = ByteVector.SPECIES_PREFERRED.loopBound(length);
1130+
for (int i = 0; i < loopBound; i += ByteVector.SPECIES_PREFERRED.length()) {
1131+
ByteVector chunk = ByteVector.fromArray(ByteVector.SPECIES_PREFERRED, bytes, offset + i);
1132+
VectorMask<Byte> mask = chunk.eq(markerVector);
1133+
if (mask.anyTrue()) {
1134+
return i + mask.firstTrue();
1135+
}
1136+
}
1137+
// tail
1138+
if (loopBound < length) {
1139+
int remaining = length - loopBound;
1140+
int tail = ByteArrayUtils.indexOf(bytes, offset + loopBound, remaining, marker);
1141+
if (tail >= 0) {
1142+
return loopBound + tail;
1143+
}
1144+
}
1145+
return -1;
1146+
}
11251147
}

0 commit comments

Comments
 (0)