Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 38 additions & 38 deletions topic/src/main/java/tech/ydb/topic/utils/Encoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import com.github.luben.zstd.ZstdInputStream;
import com.github.luben.zstd.ZstdOutputStream;
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
import org.anarres.lzo.LzoAlgorithm;
import org.anarres.lzo.LzoCompressor;
import org.anarres.lzo.LzoLibrary;
Expand All @@ -25,32 +25,13 @@ public class Encoder {

private Encoder() { }

public static byte[] encode(Codec codec, byte[] input) {
public static byte[] encode(Codec codec, byte[] input) throws IOException {
if (codec == Codec.RAW) {
return input;
}
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
OutputStream os;
try {
switch (codec) {
case GZIP:
os = new GZIPOutputStream(byteArrayOutputStream);
break;
case ZSTD:
os = new ZstdOutputStream(byteArrayOutputStream);
break;
case LZOP:
LzoCompressor lzoCompressor = LzoLibrary.getInstance().newCompressor(LzoAlgorithm.LZO1X, null);
os = new LzoOutputStream(byteArrayOutputStream, lzoCompressor);
break;
case CUSTOM:
default:
throw new RuntimeException("Unsupported codec: " + codec);
}
try (OutputStream os = makeOutputStream(codec, byteArrayOutputStream)) {
os.write(input);
os.close();
} catch (IOException exception) {
throw new RuntimeException(exception);
}
return byteArrayOutputStream.toByteArray();
}
Expand All @@ -60,30 +41,49 @@ public static byte[] decode(Codec codec, byte[] input) throws IOException {
return input;
}

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(input);
InputStream is;
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(input);
InputStream is = makeInputStream(codec, byteArrayInputStream)
) {
byte[] buffer = new byte[1024];
int length;
while ((length = is.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, length);
}
return byteArrayOutputStream.toByteArray();
}
}

private static OutputStream makeOutputStream(Codec codec,
ByteArrayOutputStream byteArrayOutputStream) throws IOException {
switch (codec) {
case GZIP:
is = new GZIPInputStream(byteArrayInputStream);
break;
return new GZIPOutputStream(byteArrayOutputStream);
case ZSTD:
is = new ZstdInputStream(byteArrayInputStream);
break;
return new ZstdOutputStreamNoFinalizer(byteArrayOutputStream);
case LZOP:
is = new LzopInputStream(byteArrayInputStream);
break;
LzoCompressor lzoCompressor = LzoLibrary.getInstance().newCompressor(LzoAlgorithm.LZO1X, null);
return new LzoOutputStream(byteArrayOutputStream, lzoCompressor);
case CUSTOM:
default:
throw new RuntimeException("Unsupported codec: " + codec);
}
byte[] buffer = new byte[1024];
int length;
while ((length = is.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, length);
}

private static InputStream makeInputStream(Codec codec,
ByteArrayInputStream byteArrayInputStream) throws IOException {
switch (codec) {
case GZIP:
return new GZIPInputStream(byteArrayInputStream);
case ZSTD:
return new ZstdInputStreamNoFinalizer(byteArrayInputStream);
case LZOP:
return new LzopInputStream(byteArrayInputStream);
case CUSTOM:
default:
throw new RuntimeException("Unsupported codec: " + codec);
}
is.close();
return byteArrayOutputStream.toByteArray();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.topic.write.impl;

import java.io.IOException;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -172,7 +173,11 @@ private void encode(EnqueuedMessage message) {
if (settings.getCodec() == Codec.RAW) {
return;
}
message.getMessage().setData(Encoder.encode(settings.getCodec(), message.getMessage().getData()));
try {
message.getMessage().setData(Encoder.encode(settings.getCodec(), message.getMessage().getData()));
} catch (IOException exception) {
throw new RuntimeException("Couldn't encode a message", exception);
}
message.setCompressedSizeBytes(message.getMessage().getData().length);
message.setCompressed(true);
logger.trace("[{}] Successfully finished encoding message", id);
Expand Down