Skip to content

Commit 784cd7c

Browse files
authored
[ISSUE #959] Automatically select the appropriate decompression algorithm according to magic code (#960)
1 parent 765a1cb commit 784cd7c

File tree

5 files changed

+115
-20
lines changed

5 files changed

+115
-20
lines changed

java/client/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,15 @@
127127
<artifactId>awaitility</artifactId>
128128
<scope>test</scope>
129129
</dependency>
130+
131+
<dependency>
132+
<groupId>com.github.luben</groupId>
133+
<artifactId>zstd-jni</artifactId>
134+
</dependency>
135+
<dependency>
136+
<groupId>org.lz4</groupId>
137+
<artifactId>lz4-java</artifactId>
138+
</dependency>
130139
</dependencies>
131140

132141
<build>

java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ public static MessageViewImpl fromProtobuf(Message message, MessageQueueImpl mq,
274274
switch (bodyEncoding) {
275275
case GZIP:
276276
try {
277-
body = Utilities.uncompressBytesGzip(body);
277+
body = Utilities.decompressBytes(body);
278278
} catch (IOException e) {
279279
log.error("Failed to uncompress message body, topic={}, messageId={}", topic, messageId);
280280
corrupted = true;

java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java

Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,14 @@
1818
package org.apache.rocketmq.client.java.misc;
1919

2020
import apache.rocketmq.v2.ReceiveMessageRequest;
21+
import com.github.luben.zstd.ZstdInputStream;
22+
import com.github.luben.zstd.ZstdOutputStream;
2123
import java.io.ByteArrayInputStream;
2224
import java.io.ByteArrayOutputStream;
25+
import java.io.FilterInputStream;
26+
import java.io.FilterOutputStream;
2327
import java.io.IOException;
28+
import java.io.InputStream;
2429
import java.lang.management.ManagementFactory;
2530
import java.lang.management.RuntimeMXBean;
2631
import java.net.InetAddress;
@@ -35,7 +40,11 @@
3540
import java.util.Random;
3641
import java.util.zip.CRC32;
3742
import java.util.zip.DeflaterOutputStream;
43+
import java.util.zip.GZIPInputStream;
44+
import java.util.zip.GZIPOutputStream;
3845
import java.util.zip.InflaterInputStream;
46+
import net.jpountz.lz4.LZ4FrameInputStream;
47+
import net.jpountz.lz4.LZ4FrameOutputStream;
3948
import org.apache.commons.lang3.StringUtils;
4049

4150
public class Utilities {
@@ -140,37 +149,59 @@ public static String hostName() {
140149
}
141150
}
142151

143-
public static byte[] compressBytesGzip(final byte[] src, final int level) throws IOException {
144-
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);
152+
public static byte[] compressBytesGZIP(final byte[] src) throws IOException {
153+
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length)) {
154+
try (FilterOutputStream outputStream = new GZIPOutputStream(byteArrayOutputStream)) {
155+
outputStream.write(src);
156+
outputStream.flush();
157+
}
158+
return byteArrayOutputStream.toByteArray();
159+
}
160+
}
161+
162+
public static byte[] compressBytesZSTD(final byte[] src, final int level) throws IOException {
163+
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length)) {
164+
try (FilterOutputStream outputStream = new ZstdOutputStream(byteArrayOutputStream, level)) {
165+
outputStream.write(src);
166+
outputStream.flush();
167+
}
168+
return byteArrayOutputStream.toByteArray();
169+
}
170+
}
171+
172+
public static byte[] compressBytesZLIB(final byte[] src, final int level) throws IOException {
145173
java.util.zip.Deflater defeater = new java.util.zip.Deflater(level);
146-
DeflaterOutputStream deflaterOutputStream =
147-
new DeflaterOutputStream(byteArrayOutputStream, defeater);
148-
try {
174+
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);
175+
DeflaterOutputStream deflaterOutputStream =
176+
new DeflaterOutputStream(byteArrayOutputStream, defeater)) {
149177
deflaterOutputStream.write(src);
150178
deflaterOutputStream.finish();
151-
deflaterOutputStream.close();
152-
153179
return byteArrayOutputStream.toByteArray();
154180
} finally {
155-
try {
156-
byteArrayOutputStream.close();
157-
} catch (IOException ignore) {
158-
// Exception not expected here.
159-
}
160181
defeater.end();
161182
}
162183
}
163184

164-
public static byte[] uncompressBytesGzip(final byte[] src) throws IOException {
185+
public static byte[] compressBytesLZ4(byte[] src) throws IOException {
186+
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length)) {
187+
try (FilterOutputStream outputStream = new LZ4FrameOutputStream(byteArrayOutputStream)) {
188+
outputStream.write(src);
189+
outputStream.flush();
190+
}
191+
return byteArrayOutputStream.toByteArray();
192+
}
193+
}
194+
195+
public static byte[] decompressBytes(final byte[] src) throws IOException {
165196
byte[] uncompressData = new byte[src.length];
166197

167198
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src);
168-
InflaterInputStream inflaterInputStream = new InflaterInputStream(byteArrayInputStream);
199+
FilterInputStream filterInputStream = getStreamByMagicCode(src, byteArrayInputStream);
169200
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);
170201

171202
try {
172203
int length;
173-
while ((length = inflaterInputStream.read(uncompressData, 0, uncompressData.length)) > 0) {
204+
while ((length = filterInputStream.read(uncompressData, 0, uncompressData.length)) > 0) {
174205
byteArrayOutputStream.write(uncompressData, 0, length);
175206
}
176207
byteArrayOutputStream.flush();
@@ -183,7 +214,7 @@ public static byte[] uncompressBytesGzip(final byte[] src) throws IOException {
183214
// Exception not expected here.
184215
}
185216
try {
186-
inflaterInputStream.close();
217+
filterInputStream.close();
187218
} catch (IOException ignore) {
188219
// Exception not expected here.
189220
}
@@ -195,6 +226,29 @@ public static byte[] uncompressBytesGzip(final byte[] src) throws IOException {
195226
}
196227
}
197228

229+
private static FilterInputStream getStreamByMagicCode(byte[] src, InputStream inputStream) throws IOException {
230+
// Automatically select the appropriate decompression algorithm according to magic code
231+
// GZIP magic code: 0x1F 0x8B
232+
// ZLIB magic code: 0x78
233+
// LZ4 magic code: 0x04 0x22 0x4D 0x18
234+
// ZSTD magic code: 0x28 0xB5 0x2F 0xFD
235+
FilterInputStream filterInputStream;
236+
if ((src[0] & 0xFF) == 0x1F && (src[1] & 0xFF) == 0x8B) {
237+
filterInputStream = new GZIPInputStream(inputStream);
238+
} else if ((src[0] & 0xFF) == 0x78) {
239+
filterInputStream = new InflaterInputStream(inputStream);
240+
} else if ((src[0] & 0xFF) == 0x04 && (src[1] & 0xFF) == 0x22 && (src[2] & 0xFF) == 0x4D
241+
&& (src[3] & 0xFF) == 0x18) {
242+
filterInputStream = new LZ4FrameInputStream(inputStream);
243+
} else if (((src[0] & 0xFF) == 0x28 && (src[1] & 0xFF) == 0xB5 && (src[2] & 0xFF) == 0x2F
244+
&& (src[3] & 0xFF) == 0xFD)) {
245+
filterInputStream = new ZstdInputStream(inputStream);
246+
} else {
247+
throw new IOException("Unknown compression format");
248+
}
249+
return filterInputStream;
250+
}
251+
198252
public static String encodeHexString(ByteBuffer byteBuffer, boolean toLowerCase) {
199253
return new String(encodeHex(byteBuffer, toLowerCase));
200254
}

java/client/src/test/java/org/apache/rocketmq/client/java/misc/UtilitiesTest.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,27 @@ public class UtilitiesTest {
3232
@Test
3333
public void testCompressAndUncompressByteArray() throws IOException {
3434
final byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
35-
final byte[] compressedBytes = Utilities.compressBytesGzip(bytes, 5);
36-
final byte[] originalBytes = Utilities.uncompressBytesGzip(compressedBytes);
37-
assertEquals(new String(originalBytes, StandardCharsets.UTF_8), body);
35+
{
36+
final byte[] compressedBytes = Utilities.compressBytesZLIB(bytes, 5);
37+
final byte[] originalBytes = Utilities.decompressBytes(compressedBytes);
38+
assertEquals(new String(originalBytes, StandardCharsets.UTF_8), body);
39+
}
40+
{
41+
final byte[] compressedBytes = Utilities.compressBytesLZ4(bytes);
42+
final byte[] originalBytes = Utilities.decompressBytes(compressedBytes);
43+
assertEquals(new String(originalBytes, StandardCharsets.UTF_8), body);
44+
}
45+
{
46+
final byte[] compressedBytes = Utilities.compressBytesZSTD(bytes, 3);
47+
final byte[] originalBytes = Utilities.decompressBytes(compressedBytes);
48+
assertEquals(new String(originalBytes, StandardCharsets.UTF_8), body);
49+
}
50+
{
51+
final byte[] compressedBytes = Utilities.compressBytesGZIP(bytes);
52+
final byte[] originalBytes = Utilities.decompressBytes(compressedBytes);
53+
assertEquals(new String(originalBytes, StandardCharsets.UTF_8), body);
54+
}
55+
3856
}
3957

4058
@Test

java/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@
7474
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
7575
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
7676
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
77+
78+
<zstd-jni.version>1.5.2-2</zstd-jni.version>
79+
<lz4-java.version>1.8.0</lz4-java.version>
7780
</properties>
7881

7982
<distributionManagement>
@@ -250,6 +253,17 @@
250253
<version>${bcpkix.version}</version>
251254
<scope>test</scope>
252255
</dependency>
256+
257+
<dependency>
258+
<groupId>org.lz4</groupId>
259+
<artifactId>lz4-java</artifactId>
260+
<version>${lz4-java.version}</version>
261+
</dependency>
262+
<dependency>
263+
<groupId>com.github.luben</groupId>
264+
<artifactId>zstd-jni</artifactId>
265+
<version>${zstd-jni.version}</version>
266+
</dependency>
253267
</dependencies>
254268
</dependencyManagement>
255269

0 commit comments

Comments
 (0)