From 2a3e5d21821ce92c9de2a6db7de8697fcf0aef00 Mon Sep 17 00:00:00 2001 From: Parker Timmins Date: Thu, 30 Oct 2025 10:13:23 -0500 Subject: [PATCH] Move ZstdCompression mode out of Zstd814StoredFieldsFormat --- .../codec/zstd/Zstd814StoredFieldsFormat.java | 143 ---------------- .../index/codec/zstd/ZstdCompressionMode.java | 157 ++++++++++++++++++ 2 files changed, 157 insertions(+), 143 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/codec/zstd/ZstdCompressionMode.java diff --git a/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java b/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java index f40ed5baf74d6..8fc8042fa7fac 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java +++ b/server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java @@ -10,22 +10,10 @@ package org.elasticsearch.index.codec.zstd; import org.apache.lucene.codecs.StoredFieldsWriter; -import org.apache.lucene.codecs.compressing.CompressionMode; -import org.apache.lucene.codecs.compressing.Compressor; -import org.apache.lucene.codecs.compressing.Decompressor; import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat; -import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.SegmentInfo; -import org.apache.lucene.store.ByteBuffersDataInput; -import org.apache.lucene.store.DataInput; -import org.apache.lucene.store.DataOutput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; -import org.apache.lucene.util.ArrayUtil; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.nativeaccess.CloseableByteBuffer; -import org.elasticsearch.nativeaccess.NativeAccess; -import org.elasticsearch.nativeaccess.Zstd; import java.io.IOException; @@ -89,135 +77,4 @@ public Mode getMode() { return mode; } - private static class ZstdCompressionMode extends CompressionMode { - private final int level; - - ZstdCompressionMode(int level) { - this.level = level; - } - - @Override - public Compressor newCompressor() { - return new ZstdCompressor(level); - } - - @Override - public Decompressor newDecompressor() { - return new ZstdDecompressor(); - } - - @Override - public String toString() { - return "ZSTD(level=" + level + ")"; - } - } - - private static final class ZstdDecompressor extends Decompressor { - - // Buffer for copying between the DataInput and native memory. No hard science behind this number, it just tries to be high enough - // to benefit from bulk copying and low enough to keep heap usage under control. - final byte[] copyBuffer = new byte[4096]; - - ZstdDecompressor() {} - - @Override - public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { - if (originalLength == 0) { - bytes.offset = 0; - bytes.length = 0; - return; - } - - final NativeAccess nativeAccess = NativeAccess.instance(); - final Zstd zstd = nativeAccess.getZstd(); - - final int compressedLength = in.readVInt(); - - try ( - CloseableByteBuffer src = nativeAccess.newConfinedBuffer(compressedLength); - CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(originalLength) - ) { - - while (src.buffer().position() < compressedLength) { - final int numBytes = Math.min(copyBuffer.length, compressedLength - src.buffer().position()); - in.readBytes(copyBuffer, 0, numBytes); - src.buffer().put(copyBuffer, 0, numBytes); - } - src.buffer().flip(); - - final int decompressedLen = zstd.decompress(dest, src); - if (decompressedLen != originalLength) { - throw new CorruptIndexException("Expected " + originalLength + " decompressed bytes, got " + decompressedLen, in); - } - - bytes.bytes = ArrayUtil.growNoCopy(bytes.bytes, length); - dest.buffer().get(offset, bytes.bytes, 0, length); - bytes.offset = 0; - bytes.length = length; - } - } - - @Override - public Decompressor clone() { - return new ZstdDecompressor(); - } - } - - private static class ZstdCompressor extends Compressor { - - final int level; - // Buffer for copying between the DataInput and native memory. No hard science behind this number, it just tries to be high enough - // to benefit from bulk copying and low enough to keep heap usage under control. - final byte[] copyBuffer = new byte[4096]; - - ZstdCompressor(int level) { - this.level = level; - } - - @Override - public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { - final NativeAccess nativeAccess = NativeAccess.instance(); - final Zstd zstd = nativeAccess.getZstd(); - - final int srcLen = Math.toIntExact(buffersInput.length()); - if (srcLen == 0) { - return; - } - - final int compressBound = zstd.compressBound(srcLen); - - // NOTE: We are allocating/deallocating native buffers on each call. We could save allocations by reusing these buffers, though - // this would come at the expense of higher permanent memory usage. Benchmarks suggested that there is some performance to save - // there, but it wouldn't be a game changer either. - // Also note that calls to #compress implicitly allocate memory under the hood for e.g. hash tables and chain tables that help - // identify duplicate strings. So if we wanted to avoid allocating memory on every compress call, we should also look into - // reusing compression contexts, which are not small and would increase permanent memory usage as well. - try ( - CloseableByteBuffer src = nativeAccess.newConfinedBuffer(srcLen); - CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(compressBound) - ) { - - while (buffersInput.position() < buffersInput.length()) { - final int numBytes = Math.min(copyBuffer.length, (int) (buffersInput.length() - buffersInput.position())); - buffersInput.readBytes(copyBuffer, 0, numBytes); - src.buffer().put(copyBuffer, 0, numBytes); - } - src.buffer().flip(); - - final int compressedLen = zstd.compress(dest, src, level); - out.writeVInt(compressedLen); - - for (int written = 0; written < compressedLen;) { - final int numBytes = Math.min(copyBuffer.length, compressedLen - written); - dest.buffer().get(copyBuffer, 0, numBytes); - out.writeBytes(copyBuffer, 0, numBytes); - written += numBytes; - assert written == dest.buffer().position(); - } - } - } - - @Override - public void close() throws IOException {} - } } diff --git a/server/src/main/java/org/elasticsearch/index/codec/zstd/ZstdCompressionMode.java b/server/src/main/java/org/elasticsearch/index/codec/zstd/ZstdCompressionMode.java new file mode 100644 index 0000000000000..350e10cfeddbd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/zstd/ZstdCompressionMode.java @@ -0,0 +1,157 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.zstd; + +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.nativeaccess.CloseableByteBuffer; +import org.elasticsearch.nativeaccess.NativeAccess; +import org.elasticsearch.nativeaccess.Zstd; + +import java.io.IOException; + +public class ZstdCompressionMode extends CompressionMode { + private final int level; + + public ZstdCompressionMode(int level) { + this.level = level; + } + + @Override + public Compressor newCompressor() { + return new ZstdCompressor(level); + } + + @Override + public Decompressor newDecompressor() { + return new ZstdDecompressor(); + } + + @Override + public String toString() { + return "ZSTD(level=" + level + ")"; + } + + private static final class ZstdCompressor extends Compressor { + + final int level; + // Buffer for copying between the DataInput and native memory. No hard science behind this number, it just tries to be high enough + // to benefit from bulk copying and low enough to keep heap usage under control. + final byte[] copyBuffer = new byte[4096]; + + private ZstdCompressor(int level) { + this.level = level; + } + + @Override + public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { + final NativeAccess nativeAccess = NativeAccess.instance(); + final Zstd zstd = nativeAccess.getZstd(); + + final int srcLen = Math.toIntExact(buffersInput.length()); + if (srcLen == 0) { + return; + } + + final int compressBound = zstd.compressBound(srcLen); + + // NOTE: We are allocating/deallocating native buffers on each call. We could save allocations by reusing these buffers, though + // this would come at the expense of higher permanent memory usage. Benchmarks suggested that there is some performance to save + // there, but it wouldn't be a game changer either. + // Also note that calls to #compress implicitly allocate memory under the hood for e.g. hash tables and chain tables that help + // identify duplicate strings. So if we wanted to avoid allocating memory on every compress call, we should also look into + // reusing compression contexts, which are not small and would increase permanent memory usage as well. + try ( + CloseableByteBuffer src = nativeAccess.newConfinedBuffer(srcLen); + CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(compressBound) + ) { + + while (buffersInput.position() < buffersInput.length()) { + final int numBytes = Math.min(copyBuffer.length, (int) (buffersInput.length() - buffersInput.position())); + buffersInput.readBytes(copyBuffer, 0, numBytes); + src.buffer().put(copyBuffer, 0, numBytes); + } + src.buffer().flip(); + + final int compressedLen = zstd.compress(dest, src, level); + out.writeVInt(compressedLen); + + for (int written = 0; written < compressedLen;) { + final int numBytes = Math.min(copyBuffer.length, compressedLen - written); + dest.buffer().get(copyBuffer, 0, numBytes); + out.writeBytes(copyBuffer, 0, numBytes); + written += numBytes; + assert written == dest.buffer().position(); + } + } + } + + @Override + public void close() throws IOException {} + } + + private static final class ZstdDecompressor extends Decompressor { + + // Buffer for copying between the DataInput and native memory. No hard science behind this number, it just tries to be high enough + // to benefit from bulk copying and low enough to keep heap usage under control. + final byte[] copyBuffer = new byte[4096]; + + private ZstdDecompressor() {} + + @Override + public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { + if (originalLength == 0) { + bytes.offset = 0; + bytes.length = 0; + return; + } + + final NativeAccess nativeAccess = NativeAccess.instance(); + final Zstd zstd = nativeAccess.getZstd(); + + final int compressedLength = in.readVInt(); + + try ( + CloseableByteBuffer src = nativeAccess.newConfinedBuffer(compressedLength); + CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(originalLength) + ) { + + while (src.buffer().position() < compressedLength) { + final int numBytes = Math.min(copyBuffer.length, compressedLength - src.buffer().position()); + in.readBytes(copyBuffer, 0, numBytes); + src.buffer().put(copyBuffer, 0, numBytes); + } + src.buffer().flip(); + + final int decompressedLen = zstd.decompress(dest, src); + if (decompressedLen != originalLength) { + throw new CorruptIndexException("Expected " + originalLength + " decompressed bytes, got " + decompressedLen, in); + } + + bytes.bytes = ArrayUtil.growNoCopy(bytes.bytes, length); + dest.buffer().get(offset, bytes.bytes, 0, length); + bytes.offset = 0; + bytes.length = length; + } + } + + @Override + public Decompressor clone() { + return new ZstdDecompressor(); + } + } +}