Skip to content

Commit 360e368

Browse files
committed
Change
1 parent db2e9e1 commit 360e368

File tree

5 files changed

+43
-53
lines changed

5 files changed

+43
-53
lines changed

server/src/main/java/org/elasticsearch/common/bytes/BytesArray.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111

1212
import org.apache.lucene.util.BytesRef;
1313
import org.apache.lucene.util.BytesRefIterator;
14-
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1514
import org.elasticsearch.common.io.stream.StreamInput;
16-
import org.elasticsearch.common.io.stream.StreamOutput;
1715
import org.elasticsearch.common.util.ByteUtils;
1816

1917
import java.io.IOException;
@@ -224,11 +222,7 @@ public StreamInput streamInput() {
224222

225223
@Override
226224
public void writeTo(OutputStream os) throws IOException {
227-
if (os instanceof StreamOutput output) {
228-
output.writeBytes(bytes, offset, length);
229-
} else {
230-
os.write(bytes, offset, length);
231-
}
225+
os.write(bytes, offset, length);
232226
}
233227

234228
@Override

server/src/main/java/org/elasticsearch/common/util/BigArrays.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.common.breaker.CircuitBreaker;
1717
import org.elasticsearch.common.breaker.CircuitBreakingException;
1818
import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService;
19-
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2019
import org.elasticsearch.common.io.stream.StreamInput;
2120
import org.elasticsearch.common.io.stream.StreamOutput;
2221
import org.elasticsearch.common.recycler.Recycler;
@@ -25,7 +24,6 @@
2524
import org.elasticsearch.core.Releasables;
2625
import org.elasticsearch.core.Streams;
2726
import org.elasticsearch.indices.breaker.CircuitBreakerService;
28-
import org.elasticsearch.transport.BytesRefRecycler;
2927

3028
import java.io.IOException;
3129
import java.io.InputStream;
@@ -472,7 +470,6 @@ public T getAndSet(long index, T value) {
472470

473471
@Nullable
474472
final PageCacheRecycler recycler;
475-
final BytesRefRecycler bytesRefRecycler;
476473
@Nullable
477474
private final CircuitBreakerService breakerService;
478475
@Nullable
@@ -494,7 +491,6 @@ protected BigArrays(
494491
) {
495492
this.checkBreaker = checkBreaker;
496493
this.recycler = recycler;
497-
this.bytesRefRecycler = recycler != null ? new BytesRefRecycler(recycler) : BytesRefRecycler.NON_RECYCLING_INSTANCE;
498494
this.breakerService = breakerService;
499495
if (breakerService != null) {
500496
breaker = breakerService.getBreaker(breakerName);
@@ -592,10 +588,6 @@ private <T extends BigArray> T validate(T array) {
592588
return array;
593589
}
594590

595-
public RecyclerBytesStreamOutput newRecyclerStreamOutput() {
596-
return new RecyclerBytesStreamOutput(bytesRefRecycler);
597-
}
598-
599591
/**
600592
* Allocate a new {@link ByteArray}.
601593
* @param size the initial length of the array

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import org.elasticsearch.common.bytes.BytesReference;
1818
import org.elasticsearch.common.io.DiskIoBufferPool;
1919
import org.elasticsearch.common.io.stream.BytesStreamOutput;
20-
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
20+
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
2121
import org.elasticsearch.common.io.stream.StreamInput;
2222
import org.elasticsearch.common.io.stream.StreamOutput;
2323
import org.elasticsearch.common.io.stream.Writeable;
@@ -608,7 +608,7 @@ TranslogWriter createWriter(
608608
* @throws IOException if adding the operation to the translog resulted in an I/O exception
609609
*/
610610
public Location add(final Operation operation) throws IOException {
611-
try (RecyclerBytesStreamOutput out = bigArrays.newRecyclerStreamOutput()) {
611+
try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays)) {
612612
writeOperationWithSize(out, operation);
613613
final BytesReference bytes = out.bytes();
614614
readLock.lock();
@@ -1640,7 +1640,7 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
16401640
out.writeInt((int) checksum);
16411641
}
16421642

1643-
public static void writeOperationWithSize(RecyclerBytesStreamOutput out, Translog.Operation op) throws IOException {
1643+
public static void writeOperationWithSize(BytesStreamOutput out, Translog.Operation op) throws IOException {
16441644
final long start = out.position();
16451645
out.skip(Integer.BYTES);
16461646
writeOperationNoSize(new BufferedChecksumStreamOutput(out), op);

server/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1818
import org.elasticsearch.common.io.Channels;
1919
import org.elasticsearch.common.io.DiskIoBufferPool;
20-
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2120
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
2221
import org.elasticsearch.common.unit.ByteSizeValue;
2322
import org.elasticsearch.common.util.BigArrays;
@@ -82,7 +81,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable {
8281
private List<Long> nonFsyncedSequenceNumbers = new ArrayList<>(64);
8382
private final int forceWriteThreshold;
8483
private volatile long bufferedBytes;
85-
private RecyclerBytesStreamOutput buffer;
84+
private ReleasableBytesStreamOutput buffer;
8685

8786
private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;
8887

@@ -245,7 +244,7 @@ public Translog.Location add(final BytesReference data, final long seqNo) throws
245244
synchronized (this) {
246245
ensureOpen();
247246
if (buffer == null) {
248-
buffer = bigArrays.newRecyclerStreamOutput();
247+
buffer = new ReleasableBytesStreamOutput(bigArrays);
249248
}
250249
assert bufferedBytes == buffer.size();
251250
final long offset = totalOffset;
@@ -549,11 +548,10 @@ private void writeBufferedOps(long offset, boolean blockOnExistingWriter) throws
549548
private synchronized ReleasableBytesReference pollOpsToWrite() {
550549
ensureOpen();
551550
if (this.buffer != null) {
552-
try (RecyclerBytesStreamOutput toWrite = this.buffer) {
553-
this.buffer = null;
554-
this.bufferedBytes = 0;
555-
return toWrite.moveToBytesReference();
556-
}
551+
ReleasableBytesStreamOutput toWrite = this.buffer;
552+
this.buffer = null;
553+
this.bufferedBytes = 0;
554+
return new ReleasableBytesReference(toWrite.bytes(), toWrite);
557555
} else {
558556
return ReleasableBytesReference.empty();
559557
}

server/src/main/java/org/elasticsearch/ingest/ESONFlat.java

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -96,43 +96,49 @@ public BytesReference getSerializedKeyBytes() {
9696
return serializedKeyBytes.get();
9797
}
9898

99-
private static final ThreadLocal<byte[]> BYTES_REF = ThreadLocal.withInitial(() -> new byte[16384]);
99+
private static final ThreadLocal<BytesRef> BYTES_REF = ThreadLocal.withInitial(() -> new BytesRef(new byte[16384]));
100100

101101
public static Recycler<BytesRef> getBytesRefRecycler() {
102-
return new Recycler<>() {
102+
return new ThreadLocalRecycler();
103+
}
103104

104-
private boolean first = false;
105+
private static class ThreadLocalRecycler implements Recycler<BytesRef> {
105106

106-
@Override
107-
public V<BytesRef> obtain() {
108-
final BytesRef bytesRef;
109-
if (first) {
110-
first = false;
111-
bytesRef = new BytesRef(BYTES_REF.get());
112-
} else {
113-
bytesRef = new BytesRef(new byte[16384]);
114-
}
115-
return new V<>() {
107+
private boolean first = true;
116108

117-
@Override
118-
public BytesRef v() {
119-
return bytesRef;
120-
}
109+
@Override
110+
public V<BytesRef> obtain() {
111+
final BytesRef bytesRef;
112+
if (first) {
113+
first = false;
114+
bytesRef = BYTES_REF.get();
115+
bytesRef.offset = 0;
116+
bytesRef.length = bytesRef.bytes.length;
117+
} else {
118+
bytesRef = new BytesRef(new byte[16384]);
119+
}
120+
return new VImpl(bytesRef);
121+
}
121122

122-
@Override
123-
public boolean isRecycled() {
124-
return false;
125-
}
123+
private record VImpl(BytesRef bytesRef) implements V<BytesRef> {
126124

127-
@Override
128-
public void close() {}
129-
};
125+
@Override
126+
public BytesRef v() {
127+
return bytesRef;
130128
}
131129

132130
@Override
133-
public int pageSize() {
134-
return 16384;
131+
public boolean isRecycled() {
132+
return false;
135133
}
136-
};
134+
135+
@Override
136+
public void close() {}
137+
}
138+
139+
@Override
140+
public int pageSize() {
141+
return 16384;
142+
}
137143
}
138144
}

0 commit comments

Comments
 (0)