Skip to content

Commit cc2c990

Browse files
committed
ORC-2075: Support new Lz4Codec based on lz4-java
### What changes were proposed in this pull request? This PR aims to support new `Lz4Codec` based on `lz4-java` to resolve the following. - https://github.com/apache/orc/security/dependabot/33 ### Why are the changes needed? Aircompressor is not usable in these days because 2.0.x version is frozen. We had better switch to `lz4-java`. - https://github.com/yawkat/lz4-java ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: `Gemini 3 Pro (High)` on `Antigravity` Closes #2511 from dongjoon-hyun/ORC-2075. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 66375ec commit cc2c990

File tree

5 files changed

+262
-2
lines changed

5 files changed

+262
-2
lines changed

java/core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@
5252
<groupId>io.airlift</groupId>
5353
<artifactId>aircompressor</artifactId>
5454
</dependency>
55+
<dependency>
56+
<groupId>at.yawk.lz4</groupId>
57+
<artifactId>lz4-java</artifactId>
58+
</dependency>
5559
<dependency>
5660
<groupId>com.github.luben</groupId>
5761
<artifactId>zstd-jni</artifactId>
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.orc.impl;
20+
21+
import net.jpountz.lz4.LZ4Compressor;
22+
import net.jpountz.lz4.LZ4Factory;
23+
import net.jpountz.lz4.LZ4SafeDecompressor;
24+
import org.apache.orc.CompressionCodec;
25+
import org.apache.orc.CompressionKind;
26+
27+
import java.io.IOException;
28+
import java.nio.ByteBuffer;
29+
30+
public class Lz4Codec implements CompressionCodec, DirectDecompressionCodec {
31+
private static final LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
32+
private static final ThreadLocal<byte[]> threadBuffer = ThreadLocal.withInitial(() -> null);
33+
34+
public Lz4Codec() {}
35+
36+
protected static byte[] getBuffer(int size) {
37+
byte[] result = threadBuffer.get();
38+
if (result == null || result.length < size || result.length > size * 2) {
39+
result = new byte[size];
40+
threadBuffer.set(result);
41+
}
42+
return result;
43+
}
44+
45+
@Override
46+
public Options getDefaultOptions() {
47+
return CompressionCodec.NullOptions.INSTANCE;
48+
}
49+
50+
@Override
51+
public boolean compress(ByteBuffer in, ByteBuffer out,
52+
ByteBuffer overflow,
53+
Options options) throws IOException {
54+
int inBytes = in.remaining();
55+
// Skip with minimum size check similar to ZstdCodec
56+
if (inBytes < 10) return false;
57+
58+
LZ4Compressor compressor = lz4Factory.fastCompressor();
59+
int maxOutputLength = compressor.maxCompressedLength(inBytes);
60+
byte[] compressed = getBuffer(maxOutputLength);
61+
62+
int outBytes = compressor.compress(in.array(), in.arrayOffset() + in.position(), inBytes,
63+
compressed, 0, maxOutputLength);
64+
65+
if (outBytes < inBytes) {
66+
int remaining = out.remaining();
67+
if (remaining >= outBytes) {
68+
System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
69+
out.position(), outBytes);
70+
out.position(out.position() + outBytes);
71+
} else {
72+
System.arraycopy(compressed, 0, out.array(), out.arrayOffset() +
73+
out.position(), remaining);
74+
out.position(out.limit());
75+
System.arraycopy(compressed, remaining, overflow.array(),
76+
overflow.arrayOffset(), outBytes - remaining);
77+
overflow.position(outBytes - remaining);
78+
}
79+
return true;
80+
} else {
81+
return false;
82+
}
83+
}
84+
85+
@Override
86+
public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
87+
if (in.isDirect() && out.isDirect()) {
88+
directDecompress(in, out);
89+
return;
90+
}
91+
92+
int srcOffset = in.arrayOffset() + in.position();
93+
int srcSize = in.remaining();
94+
int dstOffset = out.arrayOffset() + out.position();
95+
int dstSize = out.remaining();
96+
97+
LZ4SafeDecompressor decompressor = lz4Factory.safeDecompressor();
98+
int decompressedBytes = decompressor.decompress(in.array(), srcOffset, srcSize, out.array(),
99+
dstOffset, dstSize);
100+
101+
in.position(in.limit());
102+
out.position(dstOffset + decompressedBytes);
103+
out.flip();
104+
}
105+
106+
@Override
107+
public boolean isAvailable() {
108+
return true;
109+
}
110+
111+
@Override
112+
public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException {
113+
LZ4SafeDecompressor decompressor = lz4Factory.safeDecompressor();
114+
decompressor.decompress(in, out);
115+
out.flip();
116+
}
117+
118+
@Override
119+
public void reset() {
120+
}
121+
122+
@Override
123+
public void destroy() {
124+
}
125+
126+
@Override
127+
public CompressionKind getKind() {
128+
return CompressionKind.LZ4;
129+
}
130+
131+
@Override
132+
public void close() {
133+
OrcCodecPool.returnCodec(CompressionKind.LZ4, this);
134+
}
135+
}

java/core/src/java/org/apache/orc/impl/WriterImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,12 @@ public static CompressionCodec createCodec(CompressionKind kind) {
300300
return new AircompressorCodec(kind, new LzoCompressor(),
301301
new LzoDecompressor());
302302
case LZ4:
303-
return new AircompressorCodec(kind, new Lz4Compressor(),
304-
new Lz4Decompressor());
303+
if ("aircompressor".equalsIgnoreCase(System.getProperty("orc.compress.lz4.impl"))) {
304+
return new AircompressorCodec(kind, new Lz4Compressor(),
305+
new Lz4Decompressor());
306+
} else {
307+
return new Lz4Codec();
308+
}
305309
case ZSTD:
306310
if ("java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) {
307311
return new AircompressorCodec(kind, new ZstdCompressor(),
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.orc.impl;
20+
21+
import net.jpountz.lz4.LZ4Exception;
22+
import org.apache.orc.CompressionCodec;
23+
import org.junit.jupiter.api.Test;
24+
25+
import java.nio.ByteBuffer;
26+
27+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
28+
import static org.junit.jupiter.api.Assertions.assertFalse;
29+
import static org.junit.jupiter.api.Assertions.assertTrue;
30+
import static org.junit.jupiter.api.Assertions.fail;
31+
32+
public class TestLz4 {
33+
34+
@Test
35+
public void testNoOverflow() throws Exception {
36+
ByteBuffer in = ByteBuffer.allocate(10);
37+
ByteBuffer out = ByteBuffer.allocate(10);
38+
in.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 10});
39+
in.flip();
40+
CompressionCodec codec = new Lz4Codec();
41+
assertFalse(codec.compress(in, out, null,
42+
codec.getDefaultOptions()));
43+
}
44+
45+
@Test
46+
public void testCorrupt() throws Exception {
47+
ByteBuffer buf = ByteBuffer.allocate(1000);
48+
buf.put(new byte[] {127, 125, 1, 99, 98, 1});
49+
buf.flip();
50+
CompressionCodec codec = new Lz4Codec();
51+
ByteBuffer out = ByteBuffer.allocate(1000);
52+
try {
53+
codec.decompress(buf, out);
54+
fail();
55+
} catch (LZ4Exception ioe) {
56+
// EXPECTED
57+
}
58+
}
59+
60+
@Test
61+
public void testLz4CompressDecompress() throws Exception {
62+
int inputSize = 10000;
63+
CompressionCodec codec = new Lz4Codec();
64+
65+
ByteBuffer in = ByteBuffer.allocate(inputSize);
66+
ByteBuffer out = ByteBuffer.allocate(inputSize);
67+
ByteBuffer compressed = ByteBuffer.allocate(inputSize * 2); // Ample space for compressed data
68+
ByteBuffer decompressed = ByteBuffer.allocate(inputSize);
69+
70+
for (int i = 0; i < inputSize; i++) {
71+
in.put((byte) i);
72+
}
73+
in.flip();
74+
75+
// Compress
76+
assertTrue(codec.compress(in, compressed, null, codec.getDefaultOptions()));
77+
compressed.flip();
78+
79+
// Decompress
80+
codec.decompress(compressed, decompressed);
81+
82+
assertArrayEquals(in.array(), decompressed.array());
83+
}
84+
85+
@Test
86+
public void testLz4DirectDecompress() {
87+
ByteBuffer in = ByteBuffer.allocate(10000);
88+
ByteBuffer out = ByteBuffer.allocate(10000); // Heap buffer for initial compression
89+
ByteBuffer directOut = ByteBuffer.allocateDirect(10000);
90+
ByteBuffer directResult = ByteBuffer.allocateDirect(10000);
91+
for (int i = 0; i < 10000; i++) {
92+
in.put((byte) i);
93+
}
94+
in.flip();
95+
try (Lz4Codec codec = new Lz4Codec()) {
96+
assertTrue(codec.compress(in, out, null, codec.getDefaultOptions()));
97+
out.flip();
98+
directOut.put(out);
99+
directOut.flip();
100+
101+
codec.decompress(directOut, directResult);
102+
103+
// copy result from direct buffer to heap.
104+
byte[] heapBytes = new byte[in.array().length];
105+
directResult.get(heapBytes, 0, directResult.limit());
106+
107+
assertArrayEquals(in.array(), heapBytes);
108+
} catch (Exception e) {
109+
fail(e);
110+
}
111+
}
112+
}

java/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@
166166
<artifactId>aircompressor</artifactId>
167167
<version>2.0.2</version>
168168
</dependency>
169+
<dependency>
170+
<groupId>at.yawk.lz4</groupId>
171+
<artifactId>lz4-java</artifactId>
172+
<version>1.10.3</version>
173+
</dependency>
169174
<dependency>
170175
<groupId>com.github.luben</groupId>
171176
<artifactId>zstd-jni</artifactId>

0 commit comments

Comments
 (0)