Skip to content

Commit ae8cbcf

Browse files
Move ZstdCompression mode out of Zstd814StoredFieldsFormat (#137385)
Move ZstdCompressor, ZstdDecompressor, and ZstdCompressionMode out of Zstd814StoredFieldsFormat so they can be used for doc values in #137139. Keep ZstdCompressor and ZstdDecompressor as private classes within ZstdCompressionMode since they don't need to be directly instantiated.
1 parent c534b97 commit ae8cbcf

File tree

2 files changed

+157
-143
lines changed

2 files changed

+157
-143
lines changed

server/src/main/java/org/elasticsearch/index/codec/zstd/Zstd814StoredFieldsFormat.java

Lines changed: 0 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,10 @@
1010
package org.elasticsearch.index.codec.zstd;
1111

1212
import org.apache.lucene.codecs.StoredFieldsWriter;
13-
import org.apache.lucene.codecs.compressing.CompressionMode;
14-
import org.apache.lucene.codecs.compressing.Compressor;
15-
import org.apache.lucene.codecs.compressing.Decompressor;
1613
import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat;
17-
import org.apache.lucene.index.CorruptIndexException;
1814
import org.apache.lucene.index.SegmentInfo;
19-
import org.apache.lucene.store.ByteBuffersDataInput;
20-
import org.apache.lucene.store.DataInput;
21-
import org.apache.lucene.store.DataOutput;
2215
import org.apache.lucene.store.Directory;
2316
import org.apache.lucene.store.IOContext;
24-
import org.apache.lucene.util.ArrayUtil;
25-
import org.apache.lucene.util.BytesRef;
26-
import org.elasticsearch.nativeaccess.CloseableByteBuffer;
27-
import org.elasticsearch.nativeaccess.NativeAccess;
28-
import org.elasticsearch.nativeaccess.Zstd;
2917

3018
import java.io.IOException;
3119

@@ -89,135 +77,4 @@ public Mode getMode() {
8977
return mode;
9078
}
9179

92-
private static class ZstdCompressionMode extends CompressionMode {
93-
private final int level;
94-
95-
ZstdCompressionMode(int level) {
96-
this.level = level;
97-
}
98-
99-
@Override
100-
public Compressor newCompressor() {
101-
return new ZstdCompressor(level);
102-
}
103-
104-
@Override
105-
public Decompressor newDecompressor() {
106-
return new ZstdDecompressor();
107-
}
108-
109-
@Override
110-
public String toString() {
111-
return "ZSTD(level=" + level + ")";
112-
}
113-
}
114-
115-
private static final class ZstdDecompressor extends Decompressor {
116-
117-
// Buffer for copying between the DataInput and native memory. No hard science behind this number, it just tries to be high enough
118-
// to benefit from bulk copying and low enough to keep heap usage under control.
119-
final byte[] copyBuffer = new byte[4096];
120-
121-
ZstdDecompressor() {}
122-
123-
@Override
124-
public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException {
125-
if (originalLength == 0) {
126-
bytes.offset = 0;
127-
bytes.length = 0;
128-
return;
129-
}
130-
131-
final NativeAccess nativeAccess = NativeAccess.instance();
132-
final Zstd zstd = nativeAccess.getZstd();
133-
134-
final int compressedLength = in.readVInt();
135-
136-
try (
137-
CloseableByteBuffer src = nativeAccess.newConfinedBuffer(compressedLength);
138-
CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(originalLength)
139-
) {
140-
141-
while (src.buffer().position() < compressedLength) {
142-
final int numBytes = Math.min(copyBuffer.length, compressedLength - src.buffer().position());
143-
in.readBytes(copyBuffer, 0, numBytes);
144-
src.buffer().put(copyBuffer, 0, numBytes);
145-
}
146-
src.buffer().flip();
147-
148-
final int decompressedLen = zstd.decompress(dest, src);
149-
if (decompressedLen != originalLength) {
150-
throw new CorruptIndexException("Expected " + originalLength + " decompressed bytes, got " + decompressedLen, in);
151-
}
152-
153-
bytes.bytes = ArrayUtil.growNoCopy(bytes.bytes, length);
154-
dest.buffer().get(offset, bytes.bytes, 0, length);
155-
bytes.offset = 0;
156-
bytes.length = length;
157-
}
158-
}
159-
160-
@Override
161-
public Decompressor clone() {
162-
return new ZstdDecompressor();
163-
}
164-
}
165-
166-
private static class ZstdCompressor extends Compressor {
167-
168-
final int level;
169-
// Buffer for copying between the DataInput and native memory. No hard science behind this number, it just tries to be high enough
170-
// to benefit from bulk copying and low enough to keep heap usage under control.
171-
final byte[] copyBuffer = new byte[4096];
172-
173-
ZstdCompressor(int level) {
174-
this.level = level;
175-
}
176-
177-
@Override
178-
public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException {
179-
final NativeAccess nativeAccess = NativeAccess.instance();
180-
final Zstd zstd = nativeAccess.getZstd();
181-
182-
final int srcLen = Math.toIntExact(buffersInput.length());
183-
if (srcLen == 0) {
184-
return;
185-
}
186-
187-
final int compressBound = zstd.compressBound(srcLen);
188-
189-
// NOTE: We are allocating/deallocating native buffers on each call. We could save allocations by reusing these buffers, though
190-
// this would come at the expense of higher permanent memory usage. Benchmarks suggested that there is some performance to save
191-
// there, but it wouldn't be a game changer either.
192-
// Also note that calls to #compress implicitly allocate memory under the hood for e.g. hash tables and chain tables that help
193-
// identify duplicate strings. So if we wanted to avoid allocating memory on every compress call, we should also look into
194-
// reusing compression contexts, which are not small and would increase permanent memory usage as well.
195-
try (
196-
CloseableByteBuffer src = nativeAccess.newConfinedBuffer(srcLen);
197-
CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(compressBound)
198-
) {
199-
200-
while (buffersInput.position() < buffersInput.length()) {
201-
final int numBytes = Math.min(copyBuffer.length, (int) (buffersInput.length() - buffersInput.position()));
202-
buffersInput.readBytes(copyBuffer, 0, numBytes);
203-
src.buffer().put(copyBuffer, 0, numBytes);
204-
}
205-
src.buffer().flip();
206-
207-
final int compressedLen = zstd.compress(dest, src, level);
208-
out.writeVInt(compressedLen);
209-
210-
for (int written = 0; written < compressedLen;) {
211-
final int numBytes = Math.min(copyBuffer.length, compressedLen - written);
212-
dest.buffer().get(copyBuffer, 0, numBytes);
213-
out.writeBytes(copyBuffer, 0, numBytes);
214-
written += numBytes;
215-
assert written == dest.buffer().position();
216-
}
217-
}
218-
}
219-
220-
@Override
221-
public void close() throws IOException {}
222-
}
22380
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.zstd;
11+
12+
import org.apache.lucene.codecs.compressing.CompressionMode;
13+
import org.apache.lucene.codecs.compressing.Compressor;
14+
import org.apache.lucene.codecs.compressing.Decompressor;
15+
import org.apache.lucene.index.CorruptIndexException;
16+
import org.apache.lucene.store.ByteBuffersDataInput;
17+
import org.apache.lucene.store.DataInput;
18+
import org.apache.lucene.store.DataOutput;
19+
import org.apache.lucene.util.ArrayUtil;
20+
import org.apache.lucene.util.BytesRef;
21+
import org.elasticsearch.nativeaccess.CloseableByteBuffer;
22+
import org.elasticsearch.nativeaccess.NativeAccess;
23+
import org.elasticsearch.nativeaccess.Zstd;
24+
25+
import java.io.IOException;
26+
27+
public class ZstdCompressionMode extends CompressionMode {
28+
private final int level;
29+
30+
public ZstdCompressionMode(int level) {
31+
this.level = level;
32+
}
33+
34+
@Override
35+
public Compressor newCompressor() {
36+
return new ZstdCompressor(level);
37+
}
38+
39+
@Override
40+
public Decompressor newDecompressor() {
41+
return new ZstdDecompressor();
42+
}
43+
44+
@Override
45+
public String toString() {
46+
return "ZSTD(level=" + level + ")";
47+
}
48+
49+
private static final class ZstdCompressor extends Compressor {
50+
51+
final int level;
52+
// Buffer for copying between the DataInput and native memory. No hard science behind this number, it just tries to be high enough
53+
// to benefit from bulk copying and low enough to keep heap usage under control.
54+
final byte[] copyBuffer = new byte[4096];
55+
56+
private ZstdCompressor(int level) {
57+
this.level = level;
58+
}
59+
60+
@Override
61+
public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException {
62+
final NativeAccess nativeAccess = NativeAccess.instance();
63+
final Zstd zstd = nativeAccess.getZstd();
64+
65+
final int srcLen = Math.toIntExact(buffersInput.length());
66+
if (srcLen == 0) {
67+
return;
68+
}
69+
70+
final int compressBound = zstd.compressBound(srcLen);
71+
72+
// NOTE: We are allocating/deallocating native buffers on each call. We could save allocations by reusing these buffers, though
73+
// this would come at the expense of higher permanent memory usage. Benchmarks suggested that there is some performance to save
74+
// there, but it wouldn't be a game changer either.
75+
// Also note that calls to #compress implicitly allocate memory under the hood for e.g. hash tables and chain tables that help
76+
// identify duplicate strings. So if we wanted to avoid allocating memory on every compress call, we should also look into
77+
// reusing compression contexts, which are not small and would increase permanent memory usage as well.
78+
try (
79+
CloseableByteBuffer src = nativeAccess.newConfinedBuffer(srcLen);
80+
CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(compressBound)
81+
) {
82+
83+
while (buffersInput.position() < buffersInput.length()) {
84+
final int numBytes = Math.min(copyBuffer.length, (int) (buffersInput.length() - buffersInput.position()));
85+
buffersInput.readBytes(copyBuffer, 0, numBytes);
86+
src.buffer().put(copyBuffer, 0, numBytes);
87+
}
88+
src.buffer().flip();
89+
90+
final int compressedLen = zstd.compress(dest, src, level);
91+
out.writeVInt(compressedLen);
92+
93+
for (int written = 0; written < compressedLen;) {
94+
final int numBytes = Math.min(copyBuffer.length, compressedLen - written);
95+
dest.buffer().get(copyBuffer, 0, numBytes);
96+
out.writeBytes(copyBuffer, 0, numBytes);
97+
written += numBytes;
98+
assert written == dest.buffer().position();
99+
}
100+
}
101+
}
102+
103+
@Override
104+
public void close() throws IOException {}
105+
}
106+
107+
private static final class ZstdDecompressor extends Decompressor {
108+
109+
// Buffer for copying between the DataInput and native memory. No hard science behind this number, it just tries to be high enough
110+
// to benefit from bulk copying and low enough to keep heap usage under control.
111+
final byte[] copyBuffer = new byte[4096];
112+
113+
private ZstdDecompressor() {}
114+
115+
@Override
116+
public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException {
117+
if (originalLength == 0) {
118+
bytes.offset = 0;
119+
bytes.length = 0;
120+
return;
121+
}
122+
123+
final NativeAccess nativeAccess = NativeAccess.instance();
124+
final Zstd zstd = nativeAccess.getZstd();
125+
126+
final int compressedLength = in.readVInt();
127+
128+
try (
129+
CloseableByteBuffer src = nativeAccess.newConfinedBuffer(compressedLength);
130+
CloseableByteBuffer dest = nativeAccess.newConfinedBuffer(originalLength)
131+
) {
132+
133+
while (src.buffer().position() < compressedLength) {
134+
final int numBytes = Math.min(copyBuffer.length, compressedLength - src.buffer().position());
135+
in.readBytes(copyBuffer, 0, numBytes);
136+
src.buffer().put(copyBuffer, 0, numBytes);
137+
}
138+
src.buffer().flip();
139+
140+
final int decompressedLen = zstd.decompress(dest, src);
141+
if (decompressedLen != originalLength) {
142+
throw new CorruptIndexException("Expected " + originalLength + " decompressed bytes, got " + decompressedLen, in);
143+
}
144+
145+
bytes.bytes = ArrayUtil.growNoCopy(bytes.bytes, length);
146+
dest.buffer().get(offset, bytes.bytes, 0, length);
147+
bytes.offset = 0;
148+
bytes.length = length;
149+
}
150+
}
151+
152+
@Override
153+
public Decompressor clone() {
154+
return new ZstdDecompressor();
155+
}
156+
}
157+
}

0 commit comments

Comments
 (0)