Skip to content

Commit cadb7e9

Browse files
authored
Use recycler bytestream output for translog writer (elastic#135161)
Use the recycler bytestream output for translog writer. This variant allows direct access to the current page and variable page sizes which will additional optimizations down the line.
1 parent 6316494 commit cadb7e9

File tree

2 files changed

+15
-7
lines changed

2 files changed

+15
-7
lines changed

server/src/main/java/org/elasticsearch/common/util/BigArrays.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.core.Releasables;
2525
import org.elasticsearch.core.Streams;
2626
import org.elasticsearch.indices.breaker.CircuitBreakerService;
27+
import org.elasticsearch.transport.BytesRefRecycler;
2728

2829
import java.io.IOException;
2930
import java.io.InputStream;
@@ -470,6 +471,7 @@ public T getAndSet(long index, T value) {
470471

471472
@Nullable
472473
final PageCacheRecycler recycler;
474+
final BytesRefRecycler bytesRefRecycler;
473475
@Nullable
474476
private final CircuitBreakerService breakerService;
475477
@Nullable
@@ -491,6 +493,7 @@ protected BigArrays(
491493
) {
492494
this.checkBreaker = checkBreaker;
493495
this.recycler = recycler;
496+
this.bytesRefRecycler = recycler != null ? new BytesRefRecycler(recycler) : BytesRefRecycler.NON_RECYCLING_INSTANCE;
494497
this.breakerService = breakerService;
495498
if (breakerService != null) {
496499
breaker = breakerService.getBreaker(breakerName);
@@ -588,6 +591,10 @@ private <T extends BigArray> T validate(T array) {
588591
return array;
589592
}
590593

594+
public BytesRefRecycler bytesRefRecycler() {
595+
return bytesRefRecycler;
596+
}
597+
591598
/**
592599
* Allocate a new {@link ByteArray}.
593600
* @param size the initial length of the array

server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1818
import org.elasticsearch.common.io.Channels;
1919
import org.elasticsearch.common.io.DiskIoBufferPool;
20-
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
20+
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2121
import org.elasticsearch.common.unit.ByteSizeValue;
2222
import org.elasticsearch.common.util.BigArrays;
2323
import org.elasticsearch.common.util.concurrent.ReleasableLock;
@@ -82,7 +82,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
8282
private List<Long> nonFsyncedSequenceNumbers = new ArrayList<>(64);
8383
private final int forceWriteThreshold;
8484
private volatile long bufferedBytes;
85-
private ReleasableBytesStreamOutput buffer;
85+
private RecyclerBytesStreamOutput buffer;
8686

8787
private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;
8888

@@ -245,7 +245,7 @@ public Translog.Location add(final BytesReference data, final long seqNo) throws
245245
synchronized (this) {
246246
ensureOpen();
247247
if (buffer == null) {
248-
buffer = new ReleasableBytesStreamOutput(bigArrays);
248+
buffer = new RecyclerBytesStreamOutput(bigArrays.bytesRefRecycler());
249249
}
250250
assert bufferedBytes == buffer.size();
251251
final long offset = totalOffset;
@@ -544,10 +544,11 @@ private void writeBufferedOps(long offset, boolean blockOnExistingWriter) throws
544544
private synchronized ReleasableBytesReference pollOpsToWrite() {
545545
ensureOpen();
546546
if (this.buffer != null) {
547-
ReleasableBytesStreamOutput toWrite = this.buffer;
548-
this.buffer = null;
549-
this.bufferedBytes = 0;
550-
return new ReleasableBytesReference(toWrite.bytes(), toWrite);
547+
try (RecyclerBytesStreamOutput toWrite = this.buffer) {
548+
this.buffer = null;
549+
this.bufferedBytes = 0;
550+
return toWrite.moveToBytesReference();
551+
}
551552
} else {
552553
return ReleasableBytesReference.empty();
553554
}

0 commit comments

Comments
 (0)