Skip to content

Commit a4dfd29

Browse files
committed
Changes
1 parent 7046e51 commit a4dfd29

File tree

4 files changed

+134
-60
lines changed

4 files changed

+134
-60
lines changed

server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java

Lines changed: 117 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,23 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
3838
static final VarHandle VH_BE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
3939
static final VarHandle VH_LE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.LITTLE_ENDIAN);
4040

41-
private ArrayList<Recycler.V<BytesRef>> pages = new ArrayList<>();
41+
private ArrayList<Recycler.V<BytesRef>> pages = new ArrayList<>(8);
4242
private final Recycler<BytesRef> recycler;
4343
private final int pageSize;
4444
private int pageIndex = -1;
4545
private int currentCapacity = 0;
4646
private int currentPageOffset;
4747

48+
private byte[] bytesRefBytes;
49+
private int bytesRefOffset;
50+
4851
public RecyclerBytesStreamOutput(Recycler<BytesRef> recycler) {
4952
this.recycler = recycler;
5053
this.pageSize = recycler.pageSize();
5154
this.currentPageOffset = pageSize;
55+
// Always start with a page
56+
ensureCapacityFromPosition(1);
57+
nextPage();
5258
}
5359

5460
@Override
@@ -58,14 +64,12 @@ public long position() {
5864

5965
@Override
6066
public void writeByte(byte b) {
61-
int currentPageOffset = this.currentPageOffset;
62-
if (1 > (pageSize - currentPageOffset)) {
67+
if (currentPageOffset == pageSize) {
6368
ensureCapacity(1);
64-
currentPageOffset = 0;
69+
nextPage();
6570
}
66-
BytesRef currentPage = pages.get(pageIndex).v();
67-
currentPage.bytes[currentPage.offset + currentPageOffset] = b;
68-
this.currentPageOffset = currentPageOffset + 1;
71+
bytesRefBytes[bytesRefOffset + currentPageOffset] = b;
72+
++currentPageOffset;
6973
}
7074

7175
@Override
@@ -77,36 +81,69 @@ public void writeBytes(byte[] b, int offset, int length) {
7781

7882
Objects.checkFromIndexSize(offset, length, b.length);
7983

80-
// get enough pages for new size
81-
final int pageSize = this.pageSize;
82-
int currentPageOffset = this.currentPageOffset;
83-
if (length > pageSize - currentPageOffset) {
84-
ensureCapacity(length);
85-
currentPageOffset = this.currentPageOffset;
84+
int toCopy = Math.min(length, pageSize - currentPageOffset);
85+
if (toCopy != 0) {
86+
System.arraycopy(b, offset, bytesRefBytes, bytesRefOffset + currentPageOffset, toCopy);
87+
currentPageOffset += toCopy;
88+
if (toCopy == length) {
89+
return;
90+
}
8691
}
8792

88-
// bulk copy
93+
writeAdditionalPages(b, offset + toCopy, length - toCopy);
94+
}
95+
96+
private void writeAdditionalPages(byte[] b, int offset, int length) {
97+
ensureCapacity(length);
98+
8999
int bytesToCopy = length;
90-
int srcOff = offset;
91-
int j = 0;
92-
while (true) {
93-
BytesRef currentPage = pages.get(pageIndex + j).v();
100+
int srcOffset = offset;
101+
102+
while (bytesToCopy > 0) {
103+
if (currentPageOffset == pageSize) {
104+
nextPage();
105+
}
106+
94107
int toCopyThisLoop = Math.min(pageSize - currentPageOffset, bytesToCopy);
95-
System.arraycopy(b, srcOff, currentPage.bytes, currentPage.offset + currentPageOffset, toCopyThisLoop);
96-
srcOff += toCopyThisLoop;
108+
System.arraycopy(b, srcOffset, bytesRefBytes, bytesRefOffset + currentPageOffset, toCopyThisLoop);
109+
110+
srcOffset += toCopyThisLoop;
97111
bytesToCopy -= toCopyThisLoop;
98-
if (bytesToCopy > 0) {
99-
currentPageOffset = 0;
100-
} else {
101-
currentPageOffset += toCopyThisLoop;
102-
break;
103-
}
104-
j++;
112+
currentPageOffset += toCopyThisLoop;
113+
}
114+
}
115+
116+
@Override
117+
public void writeVInt(int i) throws IOException {
118+
int bytesNeeded = vIntLength(i);
119+
if (bytesNeeded > pageSize - currentPageOffset) {
120+
super.writeVInt(i);
121+
} else {
122+
putVInt(i, bytesNeeded);
105123
}
106-
this.currentPageOffset = currentPageOffset;
124+
}
107125

108-
// advance
109-
pageIndex += j;
126+
private static int vIntLength(int value) {
127+
int leadingZeros = Integer.numberOfLeadingZeros(value);
128+
if (leadingZeros >= 25) {
129+
return 1;
130+
} else if (leadingZeros >= 18) {
131+
return 2;
132+
} else if (leadingZeros >= 11) {
133+
return 3;
134+
} else if (leadingZeros >= 4) {
135+
return 4;
136+
}
137+
return 5;
138+
}
139+
140+
private void putVInt(int i, int bytesNeeded) {
141+
if (bytesNeeded == 1) {
142+
bytesRefBytes[bytesRefOffset + currentPageOffset] = (byte) i;
143+
currentPageOffset += 1;
144+
} else {
145+
currentPageOffset += putMultiByteVInt(bytesRefBytes, i, bytesRefOffset + currentPageOffset);
146+
}
110147
}
111148

112149
@Override
@@ -115,8 +152,7 @@ public void writeInt(int i) throws IOException {
115152
if (4 > (pageSize - currentPageOffset)) {
116153
super.writeInt(i);
117154
} else {
118-
BytesRef currentPage = pages.get(pageIndex).v();
119-
VH_BE_INT.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
155+
VH_BE_INT.set(bytesRefBytes, bytesRefOffset + currentPageOffset, i);
120156
this.currentPageOffset = currentPageOffset + 4;
121157
}
122158
}
@@ -126,8 +162,7 @@ public void writeIntLE(int i) throws IOException {
126162
if (4 > (pageSize - currentPageOffset)) {
127163
super.writeIntLE(i);
128164
} else {
129-
BytesRef currentPage = pages.get(pageIndex).v();
130-
VH_LE_INT.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
165+
VH_LE_INT.set(bytesRefBytes, bytesRefOffset + currentPageOffset, i);
131166
currentPageOffset += 4;
132167
}
133168
}
@@ -138,8 +173,7 @@ public void writeLong(long i) throws IOException {
138173
if (8 > (pageSize - currentPageOffset)) {
139174
super.writeLong(i);
140175
} else {
141-
BytesRef currentPage = pages.get(pageIndex).v();
142-
VH_BE_LONG.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
176+
VH_BE_LONG.set(bytesRefBytes, bytesRefOffset + currentPageOffset, i);
143177
this.currentPageOffset = currentPageOffset + 8;
144178
}
145179
}
@@ -149,8 +183,7 @@ public void writeLongLE(long i) throws IOException {
149183
if (8 > (pageSize - currentPageOffset)) {
150184
super.writeLongLE(i);
151185
} else {
152-
BytesRef currentPage = pages.get(pageIndex).v();
153-
VH_LE_LONG.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
186+
VH_LE_LONG.set(bytesRefBytes, bytesRefOffset + currentPageOffset, i);
154187
currentPageOffset += 8;
155188
}
156189
}
@@ -189,9 +222,10 @@ public void writeString(String str) throws IOException {
189222
super.writeString(str);
190223
return;
191224
}
192-
BytesRef currentPage = pages.get(pageIndex).v();
193-
int off = currentPage.offset + currentPageOffset;
194-
byte[] buffer = currentPage.bytes;
225+
226+
int off = bytesRefOffset + currentPageOffset;
227+
byte[] buffer = bytesRefBytes;
228+
195229
// mostly duplicated from StreamOutput.writeString to to get more reliable compilation of this very hot loop
196230
int offset = off + putVInt(buffer, charCount, off);
197231
for (int i = 0; i < charCount; i++) {
@@ -207,7 +241,7 @@ public void writeString(String str) throws IOException {
207241
buffer[offset++] = ((byte) (0x80 | c >> 0 & 0x3F));
208242
}
209243
}
210-
this.currentPageOffset = offset - currentPage.offset;
244+
this.currentPageOffset = offset - bytesRefOffset;
211245
}
212246

213247
@Override
@@ -220,6 +254,7 @@ public void seek(long position) {
220254
ensureCapacityFromPosition(position);
221255
int offsetInPage = (int) (position % pageSize);
222256
int pageIndex = (int) position / pageSize;
257+
223258
// Special handling for seeking to the first index in a new page, which is handled as a seeking to one-after the last index
224259
// in the previous case. This is done so that seeking to the first index of a new page does not cause a page allocation while
225260
// still allowing a fast check via (pageSize - currentPageOffset) on the remaining size in the current page in all other methods.
@@ -230,6 +265,14 @@ public void seek(long position) {
230265
this.pageIndex = pageIndex;
231266
this.currentPageOffset = offsetInPage;
232267
}
268+
if (position != 0) {
269+
BytesRef page = pages.get(this.pageIndex).v();
270+
this.bytesRefBytes = page.bytes;
271+
this.bytesRefOffset = page.offset;
272+
} else {
273+
this.bytesRefBytes = null;
274+
this.bytesRefOffset = 0;
275+
}
233276
}
234277

235278
public void skip(int length) {
@@ -241,6 +284,8 @@ public void close() {
241284
var pages = this.pages;
242285
if (pages != null) {
243286
this.pages = null;
287+
288+
this.bytesRefBytes = null;
244289
Releasables.close(pages);
245290
}
246291
}
@@ -255,6 +300,8 @@ public ReleasableBytesReference moveToBytesReference() {
255300
var bytes = bytes();
256301
var pages = this.pages;
257302
this.pages = null;
303+
304+
this.bytesRefBytes = null;
258305
return new ReleasableBytesReference(bytes, () -> Releasables.close(pages));
259306
}
260307

@@ -312,16 +359,34 @@ private void ensureCapacityFromPosition(long newPosition) {
312359
if (newPosition > Integer.MAX_VALUE - (Integer.MAX_VALUE % pageSize)) {
313360
throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data");
314361
}
315-
while (newPosition > currentCapacity) {
316-
Recycler.V<BytesRef> newPage = recycler.obtain();
317-
assert pageSize == newPage.v().length;
318-
pages.add(newPage);
319-
currentCapacity += pageSize;
320-
}
321-
// We are at the end of the current page, increment page index
322-
if (currentPageOffset == pageSize) {
323-
pageIndex++;
324-
currentPageOffset = 0;
362+
363+
long additionalCapacityNeeded = newPosition - currentCapacity;
364+
if (additionalCapacityNeeded > 0) {
365+
if (additionalCapacityNeeded <= pageSize) {
366+
Recycler.V<BytesRef> newPage = recycler.obtain();
367+
assert pageSize == newPage.v().length;
368+
pages.add(newPage);
369+
currentCapacity += pageSize;
370+
} else {
371+
// Calculate number of additional pages needed
372+
int additionalPagesNeeded = (int) ((additionalCapacityNeeded + pageSize - 1) / pageSize);
373+
pages.ensureCapacity(pages.size() + additionalPagesNeeded);
374+
for (int i = 0; i < additionalPagesNeeded; i++) {
375+
Recycler.V<BytesRef> newPage = recycler.obtain();
376+
assert pageSize == newPage.v().length;
377+
pages.add(newPage);
378+
}
379+
currentCapacity += additionalPagesNeeded * pageSize;
380+
}
325381
}
326382
}
383+
384+
private void nextPage() {
385+
pageIndex++;
386+
currentPageOffset = 0;
387+
388+
BytesRef page = pages.get(pageIndex).v();
389+
bytesRefBytes = page.bytes;
390+
bytesRefOffset = page.offset;
391+
}
327392
}

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ public static int putVInt(byte[] buffer, int i, int off) {
254254
return putMultiByteVInt(buffer, i, off);
255255
}
256256

257-
private static int putMultiByteVInt(byte[] buffer, int i, int off) {
257+
protected static int putMultiByteVInt(byte[] buffer, int i, int off) {
258258
int index = off;
259259
do {
260260
buffer[index++] = ((byte) ((i & 0x7f) | 0x80));

server/src/main/java/org/elasticsearch/common/util/BigArrays.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.common.breaker.CircuitBreaker;
1717
import org.elasticsearch.common.breaker.CircuitBreakingException;
1818
import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService;
19+
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
1920
import org.elasticsearch.common.io.stream.StreamInput;
2021
import org.elasticsearch.common.io.stream.StreamOutput;
2122
import org.elasticsearch.common.recycler.Recycler;
@@ -24,6 +25,7 @@
2425
import org.elasticsearch.core.Releasables;
2526
import org.elasticsearch.core.Streams;
2627
import org.elasticsearch.indices.breaker.CircuitBreakerService;
28+
import org.elasticsearch.transport.BytesRefRecycler;
2729

2830
import java.io.IOException;
2931
import java.io.InputStream;
@@ -470,6 +472,7 @@ public T getAndSet(long index, T value) {
470472

471473
@Nullable
472474
final PageCacheRecycler recycler;
475+
final BytesRefRecycler bytesRefRecycler;
473476
@Nullable
474477
private final CircuitBreakerService breakerService;
475478
@Nullable
@@ -491,6 +494,7 @@ protected BigArrays(
491494
) {
492495
this.checkBreaker = checkBreaker;
493496
this.recycler = recycler;
497+
this.bytesRefRecycler = recycler != null ? new BytesRefRecycler(recycler) : BytesRefRecycler.NON_RECYCLING_INSTANCE;
494498
this.breakerService = breakerService;
495499
if (breakerService != null) {
496500
breaker = breakerService.getBreaker(breakerName);
@@ -588,6 +592,10 @@ private <T extends BigArray> T validate(T array) {
588592
return array;
589593
}
590594

595+
public BytesRefRecycler bytesRefRecycler() {
596+
return bytesRefRecycler;
597+
}
598+
591599
/**
592600
* Allocate a new {@link ByteArray}.
593601
* @param size the initial length of the array

server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1818
import org.elasticsearch.common.io.Channels;
1919
import org.elasticsearch.common.io.DiskIoBufferPool;
20-
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
20+
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2121
import org.elasticsearch.common.unit.ByteSizeValue;
2222
import org.elasticsearch.common.util.BigArrays;
2323
import org.elasticsearch.common.util.concurrent.ReleasableLock;
@@ -82,7 +82,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
8282
private List<Long> nonFsyncedSequenceNumbers = new ArrayList<>(64);
8383
private final int forceWriteThreshold;
8484
private volatile long bufferedBytes;
85-
private ReleasableBytesStreamOutput buffer;
85+
private RecyclerBytesStreamOutput buffer;
8686

8787
private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;
8888

@@ -245,7 +245,7 @@ public Translog.Location add(final BytesReference data, final long seqNo) throws
245245
synchronized (this) {
246246
ensureOpen();
247247
if (buffer == null) {
248-
buffer = new ReleasableBytesStreamOutput(bigArrays);
248+
buffer = new RecyclerBytesStreamOutput(bigArrays.bytesRefRecycler());
249249
}
250250
assert bufferedBytes == buffer.size();
251251
final long offset = totalOffset;
@@ -544,10 +544,11 @@ private void writeBufferedOps(long offset, boolean blockOnExistingWriter) throws
544544
private synchronized ReleasableBytesReference pollOpsToWrite() {
545545
ensureOpen();
546546
if (this.buffer != null) {
547-
ReleasableBytesStreamOutput toWrite = this.buffer;
548-
this.buffer = null;
549-
this.bufferedBytes = 0;
550-
return new ReleasableBytesReference(toWrite.bytes(), toWrite);
547+
try (RecyclerBytesStreamOutput toWrite = this.buffer) {
548+
this.buffer = null;
549+
this.bufferedBytes = 0;
550+
return toWrite.moveToBytesReference();
551+
}
551552
} else {
552553
return ReleasableBytesReference.empty();
553554
}

0 commit comments

Comments
 (0)