-
Notifications
You must be signed in to change notification settings - Fork 31
365.add support of custom codecs in topics #447
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
alex268
merged 21 commits into
ydb-platform:release_v2.4.0
from
ekuvardin:365.Add-support-of-custom-codecs-in-topics
May 12, 2025
Merged
Changes from 18 commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
a13c41f
Add first implementation of custom codec
ekuvardin 626931d
Add codec implementation
ekuvardin 5cfb6f3
Change contract to use custom codec
ekuvardin 556f992
Add test
ekuvardin 20dc3dc
Add checkstyle and test
ekuvardin a3be1b6
Change contract to CodecRegistry
ekuvardin eda40a1
Add more test and description
ekuvardin fad26f2
Add more test and comments
ekuvardin 40bbcc1
Add more test and comments
ekuvardin caa2966
Merge remote-tracking branch 'origin/365.Add-support-of-custom-codecs…
ekuvardin 9b47dc2
Merge remote-tracking branch 'origin/master' into 365.Add-support-of-…
ekuvardin d6ad0fa
One more test connected old constructor
ekuvardin 105ea0a
Check javadoc exception
ekuvardin 16d2b40
Changed consumer names
ekuvardin ac82f40
Divide tests
ekuvardin ce0160f
Some additional order guarantee
ekuvardin 5ffb0db
Change code due to CR
ekuvardin 2086142
Fix test after resolve bug with sync writer
ekuvardin c4cea58
Add ability rewrite predefined codec.
ekuvardin d52df35
Comment test due to bag in SyncWriter
ekuvardin a77588f
Update topic/src/main/java/tech/ydb/topic/TopicClient.java
pnv1 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,83 @@ | ||
| package tech.ydb.topic.description; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.io.OutputStream; | ||
|
|
||
| /** | ||
|
|
||
| * | ||
| * Interface for custom codec implementation. | ||
| * <p> | ||
| * | ||
| * You can use custom codec as below | ||
| * 1. Implement interface methods | ||
| * Specify getId which return value more than 10000. This value identify codec across others | ||
| * 2. Use code below to write data | ||
| * Codec codecImpl = .... | ||
| * Topic client = TopicClient.newClient(ydbTransport).build(); | ||
| * <p> | ||
| * client.registerCodec(codecImpl); | ||
| * WriterSettings settings = WriterSettings.newBuilder() | ||
| * .setTopicPath(topicName) | ||
| * .setCodec(codecId) | ||
| * .build(); | ||
| * <p> | ||
| * SyncWriter writer = client.createSyncWriter(settings); | ||
| * <p> | ||
| * 3. Use to read data. Codec should be registered in {@link CodecRegistry} | ||
| * Codec codecImpl = .... | ||
| * Topic client = TopicClient.newClient(ydbTransport).build(); | ||
| * <p> | ||
| * ReaderSettings readerSettings = ReaderSettings.newBuilder() | ||
| * .addTopic(TopicReadSettings.newBuilder().setPath(topicName).build()) | ||
| * .setConsumerName(TEST_CONSUMER1) | ||
| * .build(); | ||
| * <p> | ||
| * SyncReader reader = client.createSyncReader(readerSettings); | ||
| * | ||
| * @author Nikolay Perfilov | ||
| */ | ||
| public enum Codec { | ||
| RAW, | ||
| GZIP, | ||
| LZOP, | ||
| ZSTD, | ||
| CUSTOM; | ||
| public interface Codec { | ||
| int RAW = 1; | ||
| int GZIP = 2; | ||
| int LZOP = 3; | ||
| int ZSTD = 4; | ||
| int CUSTOM = 10000; | ||
|
|
||
| /** | ||
| * Check is codec is reserved | ||
| * | ||
| * @param codec codec id | ||
| * @return true - codec id is reserved; false - elsewhere | ||
| */ | ||
| default boolean isReserved(int codec) { | ||
| return codec <= CUSTOM; | ||
| } | ||
|
|
||
| /** | ||
| * Get codec identifier | ||
| * @return codec identifier | ||
| */ | ||
| int getId(); | ||
|
|
||
| /** | ||
| * Decode data | ||
| * | ||
| * @param byteArrayInputStream input stream | ||
| * @return output stream | ||
| * @throws IOException throws when error occurs | ||
| */ | ||
|
|
||
| InputStream decode(InputStream byteArrayInputStream) throws IOException; | ||
|
|
||
| /** | ||
| * Encode data | ||
| * | ||
| * @param byteArrayOutputStream output stream | ||
| * @return output stream | ||
| * @throws IOException throws when error occurs | ||
| */ | ||
| OutputStream encode(OutputStream byteArrayOutputStream) throws IOException; | ||
|
|
||
| } |
67 changes: 67 additions & 0 deletions
67
topic/src/main/java/tech/ydb/topic/description/CodecRegistry.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| package tech.ydb.topic.description; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import tech.ydb.topic.impl.GzipCodec; | ||
| import tech.ydb.topic.impl.LzopCodec; | ||
| import tech.ydb.topic.impl.RawCodec; | ||
| import tech.ydb.topic.impl.ZstdCodec; | ||
|
|
||
| /** | ||
| * Register for custom topic codec. Local to TopicClient | ||
| * | ||
| * @author Evgeny Kuvardin | ||
| **/ | ||
| public class CodecRegistry { | ||
|
|
||
| private static final Logger logger = LoggerFactory.getLogger(CodecRegistry.class); | ||
|
|
||
| final Map<Integer, Codec> customCodecMap; | ||
|
|
||
| public CodecRegistry() { | ||
| customCodecMap = new HashMap<>(); | ||
| customCodecMap.put(Codec.RAW, RawCodec.getInstance()); | ||
| customCodecMap.put(Codec.GZIP, GzipCodec.getInstance()); | ||
| customCodecMap.put(Codec.LZOP, LzopCodec.getInstance()); | ||
| customCodecMap.put(Codec.ZSTD, ZstdCodec.getInstance()); | ||
| } | ||
|
|
||
| /** | ||
| * Register codec implementation | ||
| * @param codec codec implementation | ||
| * @return previous implementation with associated codec | ||
| */ | ||
| public Codec registerCodec(Codec codec) { | ||
| assert codec != null; | ||
| int codecId = codec.getId(); | ||
|
|
||
| if (codec.isReserved(codecId)) { | ||
| throw new RuntimeException( | ||
alex268 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "Create custom codec for reserved code not allowed: " + codec + " .Use code more than 10000"); | ||
| } | ||
| Codec result = customCodecMap.put(codecId, codec); | ||
|
|
||
| if (result != null) { | ||
| logger.info( | ||
| "Replace codec which have already associated with this id. CodecId: {} Codec: {}", | ||
| codecId, | ||
| result); | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
|
|
||
| /** | ||
| * Get codec implementation by associated id | ||
| * @param codecId codec identifier | ||
| * @return codec implementation | ||
| */ | ||
| public Codec getCodec(int codecId) { | ||
| return customCodecMap.get(codecId); | ||
| } | ||
|
|
||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| package tech.ydb.topic.impl; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.io.OutputStream; | ||
| import java.util.zip.GZIPInputStream; | ||
| import java.util.zip.GZIPOutputStream; | ||
|
|
||
| import tech.ydb.topic.description.Codec; | ||
|
|
||
| /** | ||
| * Compression codec which implements the GZIP algorithm | ||
| */ | ||
| public class GzipCodec implements Codec { | ||
|
|
||
| private static final GzipCodec INSTANCE = new GzipCodec(); | ||
|
|
||
| private GzipCodec() { | ||
| } | ||
|
|
||
| /** | ||
| * Get single instance | ||
| * @return single instance of RawCodec | ||
| */ | ||
| public static GzipCodec getInstance() { | ||
| return INSTANCE; | ||
| } | ||
|
|
||
| @Override | ||
| public int getId() { | ||
| return Codec.GZIP; | ||
| } | ||
|
|
||
| @Override | ||
| public InputStream decode(InputStream byteArrayInputStream) throws IOException { | ||
| return new GZIPInputStream(byteArrayInputStream); | ||
| } | ||
|
|
||
| @Override | ||
| public OutputStream encode(OutputStream byteArrayOutputStream) throws IOException { | ||
| return new GZIPOutputStream(byteArrayOutputStream); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| package tech.ydb.topic.impl; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.io.OutputStream; | ||
|
|
||
| import org.anarres.lzo.LzoAlgorithm; | ||
| import org.anarres.lzo.LzoCompressor; | ||
| import org.anarres.lzo.LzoLibrary; | ||
| import org.anarres.lzo.LzopInputStream; | ||
| import org.anarres.lzo.LzopOutputStream; | ||
|
|
||
| import tech.ydb.topic.description.Codec; | ||
|
|
||
| /** | ||
| * Compression codec which implements the LZO algorithm | ||
| */ | ||
| public class LzopCodec implements Codec { | ||
|
|
||
| private static final LzopCodec INSTANCE = new LzopCodec(); | ||
|
|
||
| private LzopCodec() { | ||
| } | ||
|
|
||
| /** | ||
| * Get single instance | ||
| * @return single instance of RawCodec | ||
| */ | ||
| public static LzopCodec getInstance() { | ||
| return INSTANCE; | ||
| } | ||
|
|
||
| @Override | ||
| public int getId() { | ||
| return Codec.LZOP; | ||
| } | ||
|
|
||
| @Override | ||
| public InputStream decode(InputStream byteArrayInputStream) throws IOException { | ||
| return new LzopInputStream(byteArrayInputStream); | ||
| } | ||
|
|
||
| @Override | ||
| public OutputStream encode(OutputStream byteArrayOutputStream) throws IOException { | ||
| LzoCompressor lzoCompressor = LzoLibrary.getInstance().newCompressor(LzoAlgorithm.LZO1X, null); | ||
| return new LzopOutputStream(byteArrayOutputStream, lzoCompressor); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| package tech.ydb.topic.impl; | ||
|
|
||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
| import java.io.OutputStream; | ||
|
|
||
| import tech.ydb.topic.description.Codec; | ||
|
|
||
| /** | ||
| * Default codec which don't do any encode and decode. | ||
| * | ||
| */ | ||
| public class RawCodec implements Codec { | ||
| private static final RawCodec INSTANCE = new RawCodec(); | ||
|
|
||
| private RawCodec() { | ||
| } | ||
|
|
||
| /** | ||
| * Get single instance | ||
| * @return single instance of RawCodec | ||
| */ | ||
| public static RawCodec getInstance() { | ||
| return INSTANCE; | ||
| } | ||
|
|
||
| @Override | ||
| public int getId() { | ||
| return Codec.RAW; | ||
| } | ||
|
|
||
| @Override | ||
| public InputStream decode(InputStream byteArrayInputStream) throws IOException { | ||
| return byteArrayInputStream; | ||
| } | ||
|
|
||
| @Override | ||
| public OutputStream encode(OutputStream byteArrayOutputStream) throws IOException { | ||
| return byteArrayOutputStream; | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.