Skip to content

Commit 3e7e63e

Browse files
authored
Add direct access to current recycler stream page (#135114)
This change makes a number of minor optimizations to the recycler bytes stream. The most important change is that it allow cached direct access to the current page. This helps in most scenarios where are write does not cross page boundaries. Additionally, it enables future subclasses to implement custom serialization directly to the page with minimal bounds checks. Finally, it always creates the first page in the ctor to remove guaranteed expand calls in the first stream write.
1 parent ff70916 commit 3e7e63e

File tree

4 files changed

+753
-49
lines changed

4 files changed

+753
-49
lines changed
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.benchmark.bytes;
11+
12+
import org.apache.lucene.util.BytesRef;
13+
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
14+
import org.elasticsearch.common.recycler.Recycler;
15+
import org.openjdk.jmh.annotations.Benchmark;
16+
import org.openjdk.jmh.annotations.BenchmarkMode;
17+
import org.openjdk.jmh.annotations.Fork;
18+
import org.openjdk.jmh.annotations.Measurement;
19+
import org.openjdk.jmh.annotations.Mode;
20+
import org.openjdk.jmh.annotations.OutputTimeUnit;
21+
import org.openjdk.jmh.annotations.Scope;
22+
import org.openjdk.jmh.annotations.Setup;
23+
import org.openjdk.jmh.annotations.State;
24+
import org.openjdk.jmh.annotations.Warmup;
25+
26+
import java.io.IOException;
27+
import java.util.concurrent.ThreadLocalRandom;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicReference;
30+
31+
@Warmup(iterations = 3)
32+
@Measurement(iterations = 3)
33+
@BenchmarkMode(Mode.AverageTime)
34+
@OutputTimeUnit(TimeUnit.NANOSECONDS)
35+
@State(Scope.Thread)
36+
@Fork(value = 1)
37+
public class RecyclerBytesStreamOutputBenchmark {
38+
39+
private final AtomicReference<BytesRef> bytesRef = new AtomicReference<>(new BytesRef(16384));
40+
private RecyclerBytesStreamOutput streamOutput;
41+
private String shortString;
42+
private String longString;
43+
private String nonAsciiString;
44+
private String veryLongString;
45+
private byte[] bytes1;
46+
private byte[] bytes2;
47+
private byte[] bytes3;
48+
private byte[] multiPageBytes;
49+
private int[] vints;
50+
51+
@Setup
52+
public void initResults() throws IOException {
53+
streamOutput = new RecyclerBytesStreamOutput(new BenchmarkRecycler(bytesRef));
54+
ThreadLocalRandom random = ThreadLocalRandom.current();
55+
56+
bytes1 = new byte[327];
57+
bytes2 = new byte[712];
58+
bytes3 = new byte[1678];
59+
multiPageBytes = new byte[16387 * 4];
60+
random.nextBytes(bytes1);
61+
random.nextBytes(bytes2);
62+
random.nextBytes(bytes3);
63+
random.nextBytes(multiPageBytes);
64+
65+
// We use weights to generate certain sized UTF-8 characters and vInts. However, there is still some non-determinism which could
66+
// impact direct comparisons run-to-run
67+
68+
shortString = generateAsciiString(20);
69+
longString = generateAsciiString(100);
70+
nonAsciiString = generateUtf8String(200);
71+
veryLongString = generateAsciiString(800);
72+
// vint values for benchmarking
73+
vints = new int[1000];
74+
for (int i = 0; i < vints.length; i++) {
75+
if (random.nextBoolean()) {
76+
// 1-byte 50% of the time
77+
vints[i] = random.nextInt(128);
78+
} else if (random.nextBoolean()) {
79+
// 2-byte 25% of the time
80+
vints[i] = random.nextInt(128, 16384);
81+
} else {
82+
if (random.nextBoolean()) {
83+
// 3-byte vints
84+
vints[i] = random.nextInt(16384, 2097152);
85+
} else {
86+
// All vint variants
87+
vints[i] = random.nextInt();
88+
}
89+
}
90+
}
91+
}
92+
93+
@Benchmark
94+
public void writeByte() throws IOException {
95+
streamOutput.seek(1);
96+
for (byte item : bytes1) {
97+
streamOutput.writeByte(item);
98+
}
99+
for (byte item : bytes2) {
100+
streamOutput.writeByte(item);
101+
}
102+
for (byte item : bytes3) {
103+
streamOutput.writeByte(item);
104+
}
105+
}
106+
107+
@Benchmark
108+
public void writeBytes() throws IOException {
109+
streamOutput.seek(1);
110+
streamOutput.writeBytes(bytes1, 0, bytes1.length);
111+
streamOutput.writeBytes(bytes2, 0, bytes2.length);
112+
streamOutput.writeBytes(bytes3, 0, bytes3.length);
113+
}
114+
115+
@Benchmark
116+
public void writeBytesAcrossPageBoundary() throws IOException {
117+
streamOutput.seek(16384 - 1000);
118+
streamOutput.writeBytes(bytes1, 0, bytes1.length);
119+
streamOutput.writeBytes(bytes2, 0, bytes2.length);
120+
streamOutput.writeBytes(bytes3, 0, bytes3.length);
121+
}
122+
123+
@Benchmark
124+
public void writeBytesMultiPage() throws IOException {
125+
streamOutput.seek(16384 - 1000);
126+
streamOutput.writeBytes(multiPageBytes, 0, multiPageBytes.length);
127+
}
128+
129+
@Benchmark
130+
public void writeString() throws IOException {
131+
streamOutput.seek(1);
132+
streamOutput.writeString(shortString);
133+
streamOutput.writeString(longString);
134+
streamOutput.writeString(nonAsciiString);
135+
streamOutput.writeString(veryLongString);
136+
}
137+
138+
@Benchmark
139+
public void writeVInt() throws IOException {
140+
streamOutput.seek(1);
141+
for (int vint : vints) {
142+
streamOutput.writeVInt(vint);
143+
}
144+
}
145+
146+
public static String generateAsciiString(int n) {
147+
ThreadLocalRandom random = ThreadLocalRandom.current();
148+
StringBuilder sb = new StringBuilder(n);
149+
150+
for (int i = 0; i < n; i++) {
151+
int ascii = random.nextInt(128);
152+
sb.append((char) ascii);
153+
}
154+
155+
return sb.toString();
156+
}
157+
158+
public static String generateUtf8String(int n) {
159+
ThreadLocalRandom random = ThreadLocalRandom.current();
160+
StringBuilder sb = new StringBuilder(n);
161+
162+
for (int i = 0; i < n; i++) {
163+
int codePoint;
164+
int probability = random.nextInt(100);
165+
166+
if (probability < 85) {
167+
// 1-byte UTF-8 (ASCII range)
168+
// 0x0000 to 0x007F
169+
codePoint = random.nextInt(0x0080);
170+
} else if (probability < 95) {
171+
// 2-byte UTF-8
172+
// 0x0080 to 0x07FF
173+
codePoint = random.nextInt(0x0080, 0x0800);
174+
} else {
175+
// 3-byte UTF-8
176+
// 0x0800 to 0xFFFF
177+
do {
178+
codePoint = random.nextInt(0x0800, 0x10000);
179+
// Skip surrogate pairs (0xD800-0xDFFF)
180+
} while (codePoint >= 0xD800 && codePoint <= 0xDFFF);
181+
}
182+
183+
sb.appendCodePoint(codePoint);
184+
}
185+
186+
return sb.toString();
187+
}
188+
189+
private record BenchmarkRecycler(AtomicReference<BytesRef> bytesRef) implements Recycler<BytesRef> {
190+
191+
@Override
192+
public V<BytesRef> obtain() {
193+
BytesRef recycledBytesRef = bytesRef.getAndSet(null);
194+
final BytesRef localBytesRef;
195+
final boolean recycled;
196+
if (recycledBytesRef != null) {
197+
recycled = true;
198+
localBytesRef = recycledBytesRef;
199+
} else {
200+
recycled = false;
201+
localBytesRef = new BytesRef(16384);
202+
}
203+
return new V<>() {
204+
@Override
205+
public BytesRef v() {
206+
return localBytesRef;
207+
}
208+
209+
@Override
210+
public boolean isRecycled() {
211+
return recycled;
212+
}
213+
214+
@Override
215+
public void close() {
216+
if (recycled) {
217+
bytesRef.set(localBytesRef);
218+
}
219+
}
220+
};
221+
}
222+
223+
@Override
224+
public int pageSize() {
225+
return 16384;
226+
}
227+
}
228+
}

0 commit comments

Comments
 (0)