Skip to content

Commit fdb7d31

Browse files
committed
Change
1 parent f9d9127 commit fdb7d31

File tree

3 files changed

+45
-5
lines changed

3 files changed

+45
-5
lines changed

server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.nio.ByteOrder;
2626
import java.util.ArrayList;
2727
import java.util.Objects;
28+
import java.util.zip.Checksum;
2829

2930
/**
3031
* A @link {@link StreamOutput} that uses {@link Recycler.V<BytesRef>} to acquire pages of bytes, which
@@ -348,6 +349,42 @@ public BytesReference bytes() {
348349
}
349350
}
350351

352+
public void calculateChecksum(Checksum checksum, int startPosition) {
353+
int position = (int) position();
354+
Objects.checkIndex(startPosition, position);
355+
356+
int bytesToProcess = position - startPosition;
357+
if (bytesToProcess == 0) {
358+
return;
359+
}
360+
361+
int startPageIndex = startPosition / pageSize;
362+
int startPageOffset = startPosition % pageSize;
363+
364+
final int remainder = position % pageSize;
365+
final int bytesInLastPage = remainder != 0 ? remainder : pageSize;
366+
final int endPageIndex = (position - 1) / pageSize;
367+
368+
if (startPageIndex == endPageIndex) {
369+
BytesRef page = pages.get(startPageIndex).v();
370+
checksum.update(page.bytes, page.offset + startPageOffset, bytesToProcess);
371+
} else {
372+
BytesRef firstPage = pages.get(startPageIndex).v();
373+
int firstPageBytes = pageSize - startPageOffset;
374+
checksum.update(firstPage.bytes, firstPage.offset + startPageOffset, firstPageBytes);
375+
376+
for (int i = startPageIndex + 1; i < endPageIndex; i++) {
377+
BytesRef page = pages.get(i).v();
378+
checksum.update(page.bytes, page.offset, pageSize);
379+
}
380+
381+
if (endPageIndex > startPageIndex) {
382+
BytesRef lastPage = pages.get(endPageIndex).v();
383+
checksum.update(lastPage.bytes, lastPage.offset, bytesInLastPage);
384+
}
385+
}
386+
}
387+
351388
private void ensureCapacity(int bytesNeeded) {
352389
assert bytesNeeded > pageSize - currentPageOffset;
353390
ensureCapacityFromPosition(position() + bytesNeeded);

server/src/main/java/org/elasticsearch/common/util/BigArrays.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.common.breaker.CircuitBreaker;
1717
import org.elasticsearch.common.breaker.CircuitBreakingException;
1818
import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService;
19-
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2019
import org.elasticsearch.common.io.stream.StreamInput;
2120
import org.elasticsearch.common.io.stream.StreamOutput;
2221
import org.elasticsearch.common.recycler.Recycler;

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import org.elasticsearch.common.bytes.BytesReference;
1717
import org.elasticsearch.common.io.DiskIoBufferPool;
1818
import org.elasticsearch.common.io.stream.BytesStreamOutput;
19-
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
19+
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2020
import org.elasticsearch.common.io.stream.StreamInput;
2121
import org.elasticsearch.common.io.stream.StreamOutput;
2222
import org.elasticsearch.common.io.stream.Writeable;
@@ -63,6 +63,7 @@
6363
import java.util.regex.Matcher;
6464
import java.util.regex.Pattern;
6565
import java.util.stream.Stream;
66+
import java.util.zip.CRC32;
6667

6768
import static org.elasticsearch.core.Strings.format;
6869
import static org.elasticsearch.index.translog.TranslogConfig.EMPTY_TRANSLOG_BUFFER_SIZE;
@@ -610,7 +611,7 @@ TranslogWriter createWriter(
610611
* @throws IOException if adding the operation to the translog resulted in an I/O exception
611612
*/
612613
public Location add(final Operation operation) throws IOException {
613-
try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays)) {
614+
try (RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(bigArrays.bytesRefRecycler())) {
614615
writeOperationWithSize(out, operation);
615616
final BytesReference bytes = out.bytes();
616617
readLock.lock();
@@ -1642,10 +1643,13 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
16421643
out.writeInt((int) checksum);
16431644
}
16441645

1645-
public static void writeOperationWithSize(BytesStreamOutput out, Translog.Operation op) throws IOException {
1646+
public static void writeOperationWithSize(RecyclerBytesStreamOutput out, Translog.Operation op) throws IOException {
16461647
final long start = out.position();
16471648
out.skip(Integer.BYTES);
1648-
writeOperationNoSize(new BufferedChecksumStreamOutput(out), op);
1649+
op.writeTo(out);
1650+
CRC32 checksum = new CRC32();
1651+
out.calculateChecksum(checksum, Math.toIntExact(start + 4));
1652+
out.writeInt((int) checksum.getValue());
16491653
final long end = out.position();
16501654
final int operationSize = (int) (end - Integer.BYTES - start);
16511655
out.seek(start);

0 commit comments

Comments
 (0)