diff --git a/topic/src/main/java/tech/ydb/topic/TopicClient.java b/topic/src/main/java/tech/ydb/topic/TopicClient.java index 39f7f2332..16eab0c4f 100644 --- a/topic/src/main/java/tech/ydb/topic/TopicClient.java +++ b/topic/src/main/java/tech/ydb/topic/TopicClient.java @@ -8,6 +8,7 @@ import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.topic.description.Codec; import tech.ydb.topic.description.ConsumerDescription; import tech.ydb.topic.description.TopicDescription; import tech.ydb.topic.impl.GrpcTopicRpc; @@ -164,6 +165,14 @@ default CompletableFuture> describeConsumer(String p @Override void close(); + /** + * Register custom codec implementation to TopicClient + * + * @param codec - custom implementation + */ + void registerCodec(Codec codec); + + /** * BUILDER */ diff --git a/topic/src/main/java/tech/ydb/topic/description/Codec.java b/topic/src/main/java/tech/ydb/topic/description/Codec.java index 1296d0c8d..893613dc8 100644 --- a/topic/src/main/java/tech/ydb/topic/description/Codec.java +++ b/topic/src/main/java/tech/ydb/topic/description/Codec.java @@ -1,12 +1,72 @@ package tech.ydb.topic.description; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + /** + + * + * Interface for custom codec implementation. + *

+ * + * You can use custom codec as below + * 1. Implement interface methods + * Specify getId which return value more than 10000. This value identify codec across others + * 2. Use code below to write data + * Codec codecImpl = .... + * Topic client = TopicClient.newClient(ydbTransport).build(); + *

+ * client.registerCodec(codecImpl); + * WriterSettings settings = WriterSettings.newBuilder() + * .setTopicPath(topicName) + * .setCodec(codecId) + * .build(); + *

+ * SyncWriter writer = client.createSyncWriter(settings); + *

+ * 3. Use to read data. Codec should be registered in {@link CodecRegistry} + * Codec codecImpl = .... + * Topic client = TopicClient.newClient(ydbTransport).build(); + *

+ * ReaderSettings readerSettings = ReaderSettings.newBuilder() + * .addTopic(TopicReadSettings.newBuilder().setPath(topicName).build()) + * .setConsumerName(TEST_CONSUMER1) + * .build(); + *

+ * SyncReader reader = client.createSyncReader(readerSettings); + * * @author Nikolay Perfilov */ -public enum Codec { - RAW, - GZIP, - LZOP, - ZSTD, - CUSTOM; +public interface Codec { + int RAW = 1; + int GZIP = 2; + int LZOP = 3; + int ZSTD = 4; + + /** + * Get codec identifier + * @return codec identifier + */ + int getId(); + + /** + * Decode data + * + * @param byteArrayInputStream input stream + * @return output stream + * @throws IOException throws when error occurs + */ + + InputStream decode(InputStream byteArrayInputStream) throws IOException; + + /** + * Encode data + * + * @param byteArrayOutputStream output stream + * @return output stream + * @throws IOException throws when error occurs + */ + OutputStream encode(OutputStream byteArrayOutputStream) throws IOException; + } diff --git a/topic/src/main/java/tech/ydb/topic/description/CodecRegistry.java b/topic/src/main/java/tech/ydb/topic/description/CodecRegistry.java new file mode 100644 index 000000000..32ed3dd52 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/description/CodecRegistry.java @@ -0,0 +1,63 @@ +package tech.ydb.topic.description; + +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.topic.impl.GzipCodec; +import tech.ydb.topic.impl.LzopCodec; +import tech.ydb.topic.impl.RawCodec; +import tech.ydb.topic.impl.ZstdCodec; + +/** + * Register for custom topic codec. Local to TopicClient + * + * @author Evgeny Kuvardin + **/ +public class CodecRegistry { + + private static final Logger logger = LoggerFactory.getLogger(CodecRegistry.class); + + final Map customCodecMap; + + public CodecRegistry() { + customCodecMap = new HashMap<>(); + customCodecMap.put(Codec.RAW, RawCodec.getInstance()); + customCodecMap.put(Codec.GZIP, GzipCodec.getInstance()); + customCodecMap.put(Codec.LZOP, LzopCodec.getInstance()); + customCodecMap.put(Codec.ZSTD, ZstdCodec.getInstance()); + } + + /** + * Register codec implementation + * @param codec codec implementation + * @return previous implementation with associated codec + */ + public Codec registerCodec(Codec codec) { + assert codec != null; + int codecId = codec.getId(); + + Codec result = customCodecMap.put(codecId, codec); + + if (result != null) { + logger.info( + "Replace codec which have already associated with this id. CodecId: {} Codec: {}", + codecId, + result); + } + + return result; + } + + /** + * Get codec implementation by associated id + * @param codecId codec identifier + * @return codec implementation + */ + public Codec getCodec(int codecId) { + return customCodecMap.get(codecId); + } + +} diff --git a/topic/src/main/java/tech/ydb/topic/description/Consumer.java b/topic/src/main/java/tech/ydb/topic/description/Consumer.java index 3dc85d45c..ec00d0b06 100644 --- a/topic/src/main/java/tech/ydb/topic/description/Consumer.java +++ b/topic/src/main/java/tech/ydb/topic/description/Consumer.java @@ -5,7 +5,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -14,7 +13,6 @@ import tech.ydb.core.utils.ProtobufUtils; import tech.ydb.proto.topic.YdbTopic; -import tech.ydb.topic.utils.ProtoUtils; /** * @author Nikolay Perfilov @@ -23,7 +21,7 @@ public class Consumer { private final String name; private final boolean important; private final Instant readFrom; - private final List supportedCodecs; + private final List supportedCodecs; private final Map attributes; private final ConsumerStats stats; @@ -40,8 +38,7 @@ public Consumer(YdbTopic.Consumer consumer) { this.name = consumer.getName(); this.important = consumer.getImportant(); this.readFrom = ProtobufUtils.protoToInstant(consumer.getReadFrom()); - this.supportedCodecs = consumer.getSupportedCodecs().getCodecsList() - .stream().map(ProtoUtils::codecFromProto).collect(Collectors.toList()); + this.supportedCodecs = new ArrayList<>(consumer.getSupportedCodecs().getCodecsList()); this.attributes = consumer.getAttributesMap(); this.stats = new ConsumerStats(consumer.getConsumerStats()); } @@ -68,7 +65,7 @@ public SupportedCodecs getSupportedCodecs() { return new SupportedCodecs(supportedCodecs); } - public List getSupportedCodecsList() { + public List getSupportedCodecsList() { return supportedCodecs; } @@ -88,7 +85,7 @@ public static class Builder { private String name; private boolean important = false; private Instant readFrom = null; - private List supportedCodecs = new ArrayList<>(); + private List supportedCodecs = new ArrayList<>(); private Map attributes = new HashMap<>(); private ConsumerStats stats = null; @@ -107,7 +104,7 @@ public Builder setReadFrom(Instant readFrom) { return this; } - public Builder addSupportedCodec(Codec codec) { + public Builder addSupportedCodec(int codec) { this.supportedCodecs.add(codec); return this; } diff --git a/topic/src/main/java/tech/ydb/topic/description/SupportedCodecs.java b/topic/src/main/java/tech/ydb/topic/description/SupportedCodecs.java index 09a4b126b..acea2fb45 100644 --- a/topic/src/main/java/tech/ydb/topic/description/SupportedCodecs.java +++ b/topic/src/main/java/tech/ydb/topic/description/SupportedCodecs.java @@ -9,17 +9,17 @@ * @author Nikolay Perfilov */ public class SupportedCodecs { - private final List codecs; + private final List codecs; public SupportedCodecs(Builder builder) { this.codecs = ImmutableList.copyOf(builder.codecs); } - public SupportedCodecs(List codecs) { + public SupportedCodecs(List codecs) { this.codecs = codecs; } - public List getCodecs() { + public List getCodecs() { return codecs; } @@ -31,14 +31,14 @@ public static Builder newBuilder() { * BUILDER */ public static class Builder { - private List codecs = new ArrayList<>(); + private List codecs = new ArrayList<>(); - public Builder addCodec(Codec codec) { + public Builder addCodec(int codec) { codecs.add(codec); return this; } - public Builder setCodecs(List supportedCodecs) { + public Builder setCodecs(List supportedCodecs) { this.codecs = supportedCodecs; return this; } diff --git a/topic/src/main/java/tech/ydb/topic/impl/GzipCodec.java b/topic/src/main/java/tech/ydb/topic/impl/GzipCodec.java new file mode 100644 index 000000000..6774873a9 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/impl/GzipCodec.java @@ -0,0 +1,43 @@ +package tech.ydb.topic.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import tech.ydb.topic.description.Codec; + +/** + * Compression codec which implements the GZIP algorithm + */ +public class GzipCodec implements Codec { + + private static final GzipCodec INSTANCE = new GzipCodec(); + + private GzipCodec() { + } + + /** + * Get single instance + * @return single instance of RawCodec + */ + public static GzipCodec getInstance() { + return INSTANCE; + } + + @Override + public int getId() { + return Codec.GZIP; + } + + @Override + public InputStream decode(InputStream byteArrayInputStream) throws IOException { + return new GZIPInputStream(byteArrayInputStream); + } + + @Override + public OutputStream encode(OutputStream byteArrayOutputStream) throws IOException { + return new GZIPOutputStream(byteArrayOutputStream); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/impl/LzopCodec.java b/topic/src/main/java/tech/ydb/topic/impl/LzopCodec.java new file mode 100644 index 000000000..2c98a8702 --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/impl/LzopCodec.java @@ -0,0 +1,48 @@ +package tech.ydb.topic.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.anarres.lzo.LzoAlgorithm; +import org.anarres.lzo.LzoCompressor; +import org.anarres.lzo.LzoLibrary; +import org.anarres.lzo.LzopInputStream; +import org.anarres.lzo.LzopOutputStream; + +import tech.ydb.topic.description.Codec; + +/** + * Compression codec which implements the LZO algorithm + */ +public class LzopCodec implements Codec { + + private static final LzopCodec INSTANCE = new LzopCodec(); + + private LzopCodec() { + } + + /** + * Get single instance + * @return single instance of RawCodec + */ + public static LzopCodec getInstance() { + return INSTANCE; + } + + @Override + public int getId() { + return Codec.LZOP; + } + + @Override + public InputStream decode(InputStream byteArrayInputStream) throws IOException { + return new LzopInputStream(byteArrayInputStream); + } + + @Override + public OutputStream encode(OutputStream byteArrayOutputStream) throws IOException { + LzoCompressor lzoCompressor = LzoLibrary.getInstance().newCompressor(LzoAlgorithm.LZO1X, null); + return new LzopOutputStream(byteArrayOutputStream, lzoCompressor); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/impl/RawCodec.java b/topic/src/main/java/tech/ydb/topic/impl/RawCodec.java new file mode 100644 index 000000000..d9c20d68a --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/impl/RawCodec.java @@ -0,0 +1,41 @@ +package tech.ydb.topic.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import tech.ydb.topic.description.Codec; + +/** + * Default codec which don't do any encode and decode. + * + */ +public class RawCodec implements Codec { + private static final RawCodec INSTANCE = new RawCodec(); + + private RawCodec() { + } + + /** + * Get single instance + * @return single instance of RawCodec + */ + public static RawCodec getInstance() { + return INSTANCE; + } + + @Override + public int getId() { + return Codec.RAW; + } + + @Override + public InputStream decode(InputStream byteArrayInputStream) throws IOException { + return byteArrayInputStream; + } + + @Override + public OutputStream encode(OutputStream byteArrayOutputStream) throws IOException { + return byteArrayOutputStream; + } +} diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java index 4e6261a92..08d5560cf 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java @@ -24,6 +24,7 @@ import tech.ydb.topic.TopicClient; import tech.ydb.topic.TopicRpc; import tech.ydb.topic.description.Codec; +import tech.ydb.topic.description.CodecRegistry; import tech.ydb.topic.description.Consumer; import tech.ydb.topic.description.ConsumerDescription; import tech.ydb.topic.description.MeteringMode; @@ -47,7 +48,6 @@ import tech.ydb.topic.settings.ReadEventHandlersSettings; import tech.ydb.topic.settings.ReaderSettings; import tech.ydb.topic.settings.WriterSettings; -import tech.ydb.topic.utils.ProtoUtils; import tech.ydb.topic.write.AsyncWriter; import tech.ydb.topic.write.SyncWriter; import tech.ydb.topic.write.impl.AsyncWriterImpl; @@ -63,9 +63,11 @@ public class TopicClientImpl implements TopicClient { private final TopicRpc topicRpc; private final Executor compressionExecutor; private final ExecutorService defaultCompressionExecutorService; + private final CodecRegistry codecRegistry; TopicClientImpl(TopicClientBuilderImpl builder) { this.topicRpc = builder.topicRpc; + this.codecRegistry = new CodecRegistry(); if (builder.compressionExecutor != null) { this.defaultCompressionExecutorService = null; this.compressionExecutor = builder.compressionExecutor; @@ -293,7 +295,7 @@ private TopicDescription mapDescribeTopic(YdbTopic.DescribeTopicResult result) { SupportedCodecs.Builder supportedCodecsBuilder = SupportedCodecs.newBuilder(); for (int codec : result.getSupportedCodecs().getCodecsList()) { - supportedCodecsBuilder.addCodec(ProtoUtils.codecFromProto(codec)); + supportedCodecsBuilder.addCodec(codec); } description.setSupportedCodecs(supportedCodecsBuilder.build()); @@ -305,12 +307,12 @@ private TopicDescription mapDescribeTopic(YdbTopic.DescribeTopicResult result) { @Override public SyncReader createSyncReader(ReaderSettings settings) { - return new SyncReaderImpl(topicRpc, settings); + return new SyncReaderImpl(topicRpc, settings, codecRegistry); } @Override public AsyncReader createAsyncReader(ReaderSettings settings, ReadEventHandlersSettings handlersSettings) { - return new AsyncReaderImpl(topicRpc, settings, handlersSettings); + return new AsyncReaderImpl(topicRpc, settings, handlersSettings, codecRegistry); } @Override @@ -328,12 +330,17 @@ public CompletableFuture commitOffset(String path, CommitOffsetSettings @Override public SyncWriter createSyncWriter(WriterSettings settings) { - return new SyncWriterImpl(topicRpc, settings, compressionExecutor); + return new SyncWriterImpl(topicRpc, settings, compressionExecutor, codecRegistry); } @Override public AsyncWriter createAsyncWriter(WriterSettings settings) { - return new AsyncWriterImpl(topicRpc, settings, compressionExecutor); + return new AsyncWriterImpl(topicRpc, settings, compressionExecutor, codecRegistry); + } + + @Override + public void registerCodec(Codec codec) { + codecRegistry.registerCodec(codec); } private static YdbTopic.MeteringMode toProto(MeteringMode meteringMode) { @@ -372,10 +379,10 @@ private static YdbTopic.Consumer toProto(Consumer consumer) { consumerBuilder.setReadFrom(ProtobufUtils.instantToProto(consumer.getReadFrom())); } - List supportedCodecs = consumer.getSupportedCodecsList(); + List supportedCodecs = consumer.getSupportedCodecsList(); if (!supportedCodecs.isEmpty()) { YdbTopic.SupportedCodecs.Builder codecBuilder = YdbTopic.SupportedCodecs.newBuilder(); - supportedCodecs.forEach(codec -> codecBuilder.addCodecs(ProtoUtils.toProto(codec))); + supportedCodecs.forEach(codecBuilder::addCodecs); consumerBuilder.setSupportedCodecs(codecBuilder.build()); } @@ -383,10 +390,10 @@ private static YdbTopic.Consumer toProto(Consumer consumer) { } private static YdbTopic.SupportedCodecs toProto(SupportedCodecs supportedCodecs) { - List supportedCodecsList = supportedCodecs.getCodecs(); + List supportedCodecsList = supportedCodecs.getCodecs(); YdbTopic.SupportedCodecs.Builder codecsBuilder = YdbTopic.SupportedCodecs.newBuilder(); - for (Codec codec : supportedCodecsList) { - codecsBuilder.addCodecs(tech.ydb.topic.utils.ProtoUtils.toProto(codec)); + for (Integer codec : supportedCodecsList) { + codecsBuilder.addCodecs(codec); } return codecsBuilder.build(); } diff --git a/topic/src/main/java/tech/ydb/topic/impl/ZstdCodec.java b/topic/src/main/java/tech/ydb/topic/impl/ZstdCodec.java new file mode 100644 index 000000000..e867812cc --- /dev/null +++ b/topic/src/main/java/tech/ydb/topic/impl/ZstdCodec.java @@ -0,0 +1,44 @@ +package tech.ydb.topic.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import com.github.luben.zstd.ZstdInputStreamNoFinalizer; +import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; + +import tech.ydb.topic.description.Codec; + +/** + * Compression codec which implements the ZSTD algorithm + */ +public class ZstdCodec implements Codec { + + private static final ZstdCodec INSTANCE = new ZstdCodec(); + + private ZstdCodec() { + } + + /** + * Get single instance + * @return single instance of RawCodec + */ + public static ZstdCodec getInstance() { + return INSTANCE; + } + + @Override + public int getId() { + return Codec.ZSTD; + } + + @Override + public InputStream decode(InputStream byteArrayInputStream) throws IOException { + return new ZstdInputStreamNoFinalizer(byteArrayInputStream); + } + + @Override + public OutputStream encode(OutputStream byteArrayOutputStream) throws IOException { + return new ZstdOutputStreamNoFinalizer(byteArrayOutputStream); + } +} diff --git a/topic/src/main/java/tech/ydb/topic/read/DecompressionException.java b/topic/src/main/java/tech/ydb/topic/read/DecompressionException.java index b311ea903..47f11d92e 100644 --- a/topic/src/main/java/tech/ydb/topic/read/DecompressionException.java +++ b/topic/src/main/java/tech/ydb/topic/read/DecompressionException.java @@ -3,8 +3,6 @@ import java.io.IOException; import java.io.UncheckedIOException; -import tech.ydb.topic.description.Codec; - /** * @author Nikolay Perfilov */ @@ -12,9 +10,9 @@ public class DecompressionException extends UncheckedIOException { private static final long serialVersionUID = 2720187645859527813L; private final byte[] rawData; - private final Codec codec; + private final int codec; - public DecompressionException(String message, IOException cause, byte[] rawData, Codec codec) { + public DecompressionException(String message, IOException cause, byte[] rawData, int codec) { super(message, cause); this.rawData = rawData; this.codec = codec; @@ -30,7 +28,7 @@ public byte[] getRawData() { /** * @return Codec of message byte data */ - public Codec getCodec() { + public int getCodec() { return codec; } } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java index 2cf21ba35..d7cd49c06 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/AsyncReaderImpl.java @@ -8,6 +8,8 @@ import java.util.concurrent.Executors; import java.util.function.Consumer; +import javax.annotation.Nonnull; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,6 +17,7 @@ import tech.ydb.core.Status; import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.TopicRpc; +import tech.ydb.topic.description.CodecRegistry; import tech.ydb.topic.read.AsyncReader; import tech.ydb.topic.read.PartitionOffsets; import tech.ydb.topic.read.PartitionSession; @@ -45,8 +48,11 @@ public class AsyncReaderImpl extends ReaderImpl implements AsyncReader { private final ExecutorService defaultHandlerExecutorService; private final ReadEventHandler eventHandler; - public AsyncReaderImpl(TopicRpc topicRpc, ReaderSettings settings, ReadEventHandlersSettings handlersSettings) { - super(topicRpc, settings); + public AsyncReaderImpl(TopicRpc topicRpc, + ReaderSettings settings, + ReadEventHandlersSettings handlersSettings, + @Nonnull CodecRegistry codecRegistry) { + super(topicRpc, settings, codecRegistry); this.eventHandler = handlersSettings.getEventHandler(); if (handlersSettings.getExecutor() != null) { diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/Batch.java b/topic/src/main/java/tech/ydb/topic/read/impl/Batch.java index f80ecbce3..8bd3fe017 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/Batch.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/Batch.java @@ -4,8 +4,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import tech.ydb.topic.description.Codec; - /** * @author Nikolay Perfilov */ @@ -36,7 +34,7 @@ public CompletableFuture getReadFuture() { return readFuture; } - public Codec getCodec() { + public int getCodec() { return meta.getCodec(); } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/BatchMeta.java b/topic/src/main/java/tech/ydb/topic/read/impl/BatchMeta.java index 80a0d1b5a..ecc8a3a11 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/BatchMeta.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/BatchMeta.java @@ -5,7 +5,6 @@ import tech.ydb.core.utils.ProtobufUtils; import tech.ydb.proto.topic.YdbTopic; -import tech.ydb.topic.description.Codec; /** * @author Nikolay Perfilov @@ -13,13 +12,13 @@ public class BatchMeta { private final String producerId; private final Map writeSessionMeta; - private final Codec codec; + private final int codec; private final Instant writtenAt; public BatchMeta(YdbTopic.StreamReadMessage.ReadResponse.Batch batch) { this.producerId = batch.getProducerId(); this.writeSessionMeta = batch.getWriteSessionMetaMap(); - this.codec = tech.ydb.topic.utils.ProtoUtils.codecFromProto(batch.getCodec()); + this.codec = batch.getCodec(); this.writtenAt = ProtobufUtils.protoToInstant(batch.getWrittenAt()); } @@ -31,7 +30,7 @@ public Map getWriteSessionMeta() { return writeSessionMeta; } - public Codec getCodec() { + public int getCodec() { return codec; } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java index 6e28f86c2..d4dc1a414 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/PartitionSessionImpl.java @@ -23,6 +23,7 @@ import tech.ydb.core.utils.ProtobufUtils; import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.description.Codec; +import tech.ydb.topic.description.CodecRegistry; import tech.ydb.topic.description.MetadataItem; import tech.ydb.topic.description.OffsetsRange; import tech.ydb.topic.read.Message; @@ -54,10 +55,12 @@ public class PartitionSessionImpl { private final Consumer> commitFunction; private final NavigableMap> commitFutures = new ConcurrentSkipListMap<>(); private final ReentrantLock commitFuturesLock = new ReentrantLock(); + private final CodecRegistry codecRegistry; // Offset of the last read message + 1 private long lastReadOffset; private long lastCommittedOffset; + private PartitionSessionImpl(Builder builder) { this.id = builder.id; this.fullId = builder.fullId; @@ -70,6 +73,7 @@ private PartitionSessionImpl(Builder builder) { this.decompressionExecutor = builder.decompressionExecutor; this.dataEventCallback = builder.dataEventCallback; this.commitFunction = builder.commitFunction; + this.codecRegistry = builder.codecRegistry; logger.info("[{}] Partition session is started for Topic \"{}\" and Consumer \"{}\". CommittedOffset: {}. " + "Partition offsets: {}-{}", fullId, topicPath, consumerName, lastReadOffset, builder.partitionOffsets.getStart(), builder.partitionOffsets.getEnd()); @@ -285,7 +289,7 @@ private void decode(Batch batch) { batch.getMessages().forEach(message -> { try { - message.setData(Encoder.decode(batch.getCodec(), message.getData())); + message.setData(Encoder.decode(batch.getCodec(), message.getData(), this.codecRegistry)); message.setDecompressed(true); } catch (IOException exception) { message.setException(exception); @@ -381,6 +385,7 @@ public static class Builder { private Executor decompressionExecutor; private Function> dataEventCallback; private Consumer> commitFunction; + private CodecRegistry codecRegistry; public Builder setId(long id) { this.id = id; @@ -435,5 +440,10 @@ public Builder setCommitFunction(Consumer> commitFunction) { public PartitionSessionImpl build() { return new PartitionSessionImpl(this); } + + public Builder setCodecRegistry(CodecRegistry codecRegistry) { + this.codecRegistry = codecRegistry; + return this; + } } } diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java index 984134e06..473847fe4 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/ReaderImpl.java @@ -13,6 +13,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import javax.annotation.Nonnull; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +27,7 @@ import tech.ydb.proto.StatusCodesProtos; import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.TopicRpc; +import tech.ydb.topic.description.CodecRegistry; import tech.ydb.topic.description.OffsetsRange; import tech.ydb.topic.impl.GrpcStreamRetrier; import tech.ydb.topic.read.PartitionOffsets; @@ -49,16 +52,18 @@ public abstract class ReaderImpl extends GrpcStreamRetrier { private final Executor decompressionExecutor; private final ExecutorService defaultDecompressionExecutorService; private final AtomicReference> initResultFutureRef = new AtomicReference<>(null); + private final CodecRegistry codecRegistry; // Every reading stream has a sequential number (for debug purposes) private final AtomicLong seqNumberCounter = new AtomicLong(0); private final String consumerName; - public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { + public ReaderImpl(TopicRpc topicRpc, ReaderSettings settings, @Nonnull CodecRegistry codecRegistry) { super(topicRpc.getScheduler(), settings.getErrorsHandler()); this.topicRpc = topicRpc; this.settings = settings; this.session = new ReadSessionImpl(); + this.codecRegistry = codecRegistry; if (settings.getDecompressionExecutor() != null) { this.defaultDecompressionExecutorService = null; this.decompressionExecutor = settings.getDecompressionExecutor(); @@ -409,6 +414,7 @@ private void onStartPartitionSessionRequest(YdbTopic.StreamReadMessage.StartPart .setDecompressionExecutor(decompressionExecutor) .setDataEventCallback(ReaderImpl.this::handleDataReceivedEvent) .setCommitFunction((offsets) -> sendCommitOffsetRequest(partitionSessionId, partitionId, offsets)) + .setCodecRegistry(codecRegistry) .build(); partitionSessions.put(partitionSession.getId(), partitionSession); diff --git a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java index 6c3bd37cb..bd9d8637a 100644 --- a/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java +++ b/topic/src/main/java/tech/ydb/topic/read/impl/SyncReaderImpl.java @@ -12,6 +12,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -20,6 +21,7 @@ import tech.ydb.core.Status; import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.TopicRpc; +import tech.ydb.topic.description.CodecRegistry; import tech.ydb.topic.read.Message; import tech.ydb.topic.read.PartitionSession; import tech.ydb.topic.read.SyncReader; @@ -40,8 +42,8 @@ public class SyncReaderImpl extends ReaderImpl implements SyncReader { private final Condition queueIsNotEmptyCondition = queueLock.newCondition(); private int currentMessageIndex = 0; - public SyncReaderImpl(TopicRpc topicRpc, ReaderSettings settings) { - super(topicRpc, settings); + public SyncReaderImpl(TopicRpc topicRpc, ReaderSettings settings, @Nonnull CodecRegistry codecRegistry) { + super(topicRpc, settings, codecRegistry); } private static class MessageBatchWrapper { diff --git a/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java b/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java index 7f160c4d6..11327d356 100644 --- a/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java +++ b/topic/src/main/java/tech/ydb/topic/settings/WriterSettings.java @@ -16,7 +16,7 @@ public class WriterSettings { private final String producerId; private final String messageGroupId; private final Long partitionId; - private final Codec codec; + private final int codec; private final long maxSendBufferMemorySize; private final int maxSendBufferMessagesCount; private final BiConsumer errorsHandler; @@ -56,7 +56,7 @@ public Long getPartitionId() { return partitionId; } - public Codec getCodec() { + public int getCodec() { return codec; } @@ -76,7 +76,7 @@ public static class Builder { private String producerId = null; private String messageGroupId = null; private Long partitionId = null; - private Codec codec = Codec.GZIP; + private int codec = Codec.GZIP; private long maxSendBufferMemorySize = MAX_MEMORY_USAGE_BYTES_DEFAULT; private int maxSendBufferMessagesCount = MAX_IN_FLIGHT_COUNT_DEFAULT; private BiConsumer errorsHandler = null; @@ -130,7 +130,7 @@ public Builder setPartitionId(long partitionId) { * @param codec compression codec * @return settings builder */ - public Builder setCodec(Codec codec) { + public Builder setCodec(int codec) { this.codec = codec; return this; } diff --git a/topic/src/main/java/tech/ydb/topic/utils/Encoder.java b/topic/src/main/java/tech/ydb/topic/utils/Encoder.java index 88ef7e183..f705ad344 100644 --- a/topic/src/main/java/tech/ydb/topic/utils/Encoder.java +++ b/topic/src/main/java/tech/ydb/topic/utils/Encoder.java @@ -5,46 +5,65 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; -import com.github.luben.zstd.ZstdInputStreamNoFinalizer; -import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; -import org.anarres.lzo.LzoAlgorithm; -import org.anarres.lzo.LzoCompressor; -import org.anarres.lzo.LzoLibrary; -import org.anarres.lzo.LzopInputStream; -import org.anarres.lzo.LzopOutputStream; +import javax.annotation.Nonnull; import tech.ydb.topic.description.Codec; +import tech.ydb.topic.description.CodecRegistry; /** + * Class accumulated logic for encode and decode messages + * * @author Nikolay Perfilov */ public class Encoder { - private Encoder() { } + private Encoder() { + } - public static byte[] encode(Codec codec, byte[] input) throws IOException { + /** + * Encode messages + * + * @param codec codec identifier + * @param input byte array of data to be encoded + * @param codecRegistry contains custom codecs + * @return encoded data + * @throws IOException throws when error has happened + */ + public static byte[] encode(int codec, + @Nonnull byte[] input, + @Nonnull CodecRegistry codecRegistry) throws IOException { if (codec == Codec.RAW) { return input; } + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - try (OutputStream os = makeOutputStream(codec, byteArrayOutputStream)) { + try (OutputStream os = makeOutputStream(codec, byteArrayOutputStream, codecRegistry)) { os.write(input); } return byteArrayOutputStream.toByteArray(); } - public static byte[] decode(Codec codec, byte[] input) throws IOException { + /** + * Decode messages + * + * @param codec codec identifier + * @param input byte array of data to be decoded + * @param codecRegistry contains custom codecs + * @return decoded data + * @throws IOException throws when error has happened + */ + public static byte[] decode(int codec, + @Nonnull byte[] input, + @Nonnull CodecRegistry codecRegistry) throws IOException { if (codec == Codec.RAW) { return input; } try ( - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(input); - InputStream is = makeInputStream(codec, byteArrayInputStream) + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(input); + InputStream is = makeInputStream(codec, byteArrayInputStream, codecRegistry) ) { byte[] buffer = new byte[1024]; int length; @@ -55,35 +74,27 @@ public static byte[] decode(Codec codec, byte[] input) throws IOException { } } - private static OutputStream makeOutputStream(Codec codec, - ByteArrayOutputStream byteArrayOutputStream) throws IOException { - switch (codec) { - case GZIP: - return new GZIPOutputStream(byteArrayOutputStream); - case ZSTD: - return new ZstdOutputStreamNoFinalizer(byteArrayOutputStream); - case LZOP: - LzoCompressor lzoCompressor = LzoLibrary.getInstance().newCompressor(LzoAlgorithm.LZO1X, null); - return new LzopOutputStream(byteArrayOutputStream, lzoCompressor); - case CUSTOM: - default: - throw new RuntimeException("Unsupported codec: " + codec); - } + private static OutputStream makeOutputStream(int codecId, + @Nonnull ByteArrayOutputStream byteArrayOutputStream, + @Nonnull CodecRegistry codecRegistry) throws IOException { + Codec codec = getCodec(codecId, codecRegistry); + + return codec.encode(byteArrayOutputStream); } - private static InputStream makeInputStream(Codec codec, - ByteArrayInputStream byteArrayInputStream) throws IOException { - switch (codec) { - case GZIP: - return new GZIPInputStream(byteArrayInputStream); - case ZSTD: - return new ZstdInputStreamNoFinalizer(byteArrayInputStream); - case LZOP: - return new LzopInputStream(byteArrayInputStream); - case CUSTOM: - default: - throw new RuntimeException("Unsupported codec: " + codec); - } + private static InputStream makeInputStream(int codecId, + @Nonnull ByteArrayInputStream byteArrayInputStream, + @Nonnull CodecRegistry codecRegistry) throws IOException { + Codec codec = getCodec(codecId, codecRegistry); + + return codec.decode(byteArrayInputStream); } + private static @Nonnull Codec getCodec(int codecId, @Nonnull CodecRegistry codecRegistry) { + Codec codec = codecRegistry.getCodec(codecId); + if (codec == null) { + throw new RuntimeException("Unsupported codec: " + codecId); + } + return codec; + } } diff --git a/topic/src/main/java/tech/ydb/topic/utils/ProtoUtils.java b/topic/src/main/java/tech/ydb/topic/utils/ProtoUtils.java deleted file mode 100644 index 25d72a5c8..000000000 --- a/topic/src/main/java/tech/ydb/topic/utils/ProtoUtils.java +++ /dev/null @@ -1,46 +0,0 @@ -package tech.ydb.topic.utils; - -import tech.ydb.proto.topic.YdbTopic; -import tech.ydb.topic.description.Codec; - -/** - * @author Nikolay Perfilov - */ -public class ProtoUtils { - - private ProtoUtils() { } - - public static int toProto(Codec codec) { - switch (codec) { - case RAW: - return YdbTopic.Codec.CODEC_RAW_VALUE; - case GZIP: - return YdbTopic.Codec.CODEC_GZIP_VALUE; - case LZOP: - return YdbTopic.Codec.CODEC_LZOP_VALUE; - case ZSTD: - return YdbTopic.Codec.CODEC_ZSTD_VALUE; - case CUSTOM: - return YdbTopic.Codec.CODEC_CUSTOM_VALUE; - default: - throw new RuntimeException("Cannot convert codec to proto. Unknown codec value: " + codec); - } - } - - public static Codec codecFromProto(int codec) { - switch (codec) { - case YdbTopic.Codec.CODEC_RAW_VALUE: - return Codec.RAW; - case YdbTopic.Codec.CODEC_GZIP_VALUE: - return Codec.GZIP; - case YdbTopic.Codec.CODEC_LZOP_VALUE: - return Codec.LZOP; - case YdbTopic.Codec.CODEC_ZSTD_VALUE: - return Codec.ZSTD; - case YdbTopic.Codec.CODEC_CUSTOM_VALUE: - return Codec.CUSTOM; - default: - throw new RuntimeException("Unknown codec value from proto: " + codec); - } - } -} diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java index 0c21ded55..e1e0ab7e6 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/AsyncWriterImpl.java @@ -4,7 +4,10 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; +import javax.annotation.Nonnull; + import tech.ydb.topic.TopicRpc; +import tech.ydb.topic.description.CodecRegistry; import tech.ydb.topic.settings.SendSettings; import tech.ydb.topic.settings.WriterSettings; import tech.ydb.topic.write.AsyncWriter; @@ -18,8 +21,11 @@ */ public class AsyncWriterImpl extends WriterImpl implements AsyncWriter { - public AsyncWriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) { - super(topicRpc, settings, compressionExecutor); + public AsyncWriterImpl(TopicRpc topicRpc, + WriterSettings settings, + Executor compressionExecutor, + @Nonnull CodecRegistry codecRegistry) { + super(topicRpc, settings, compressionExecutor, codecRegistry); } @Override diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java b/topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java index c1ad47630..4e478cf31 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/MessageSender.java @@ -15,7 +15,6 @@ import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.description.MetadataItem; import tech.ydb.topic.settings.WriterSettings; -import tech.ydb.topic.utils.ProtoUtils; /** * Utility class that splits messages into several requests so that every request would be less than grpc size limit @@ -80,7 +79,7 @@ public void setSession(WriteSession session) { private void reset() { writeRequestBuilder = YdbTopic.StreamWriteMessage.WriteRequest.newBuilder() - .setCodec(ProtoUtils.toProto(settings.getCodec())); + .setCodec(settings.getCodec()); messageCount = 0; totalMessageDataProtoSize = 0; } @@ -124,7 +123,7 @@ public void sendWriteRequest() { messages.subList(firstHalfMessagesCount, messages.size()) )) { writeRequestBuilder = YdbTopic.StreamWriteMessage.WriteRequest.newBuilder() - .setCodec(ProtoUtils.toProto(settings.getCodec())); + .setCodec(settings.getCodec()); writeRequestBuilder.addAllMessages(sublist); YdbTopic.StreamWriteMessage.FromClient subRequest = YdbTopic.StreamWriteMessage.FromClient .newBuilder() diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java index 31de1ca2b..d396ae668 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java @@ -5,7 +5,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.Nonnull; + import tech.ydb.topic.TopicRpc; +import tech.ydb.topic.description.CodecRegistry; import tech.ydb.topic.settings.SendSettings; import tech.ydb.topic.settings.WriterSettings; import tech.ydb.topic.write.InitResult; @@ -18,8 +21,11 @@ public class SyncWriterImpl extends WriterImpl implements SyncWriter { //private static final Logger logger = LoggerFactory.getLogger(SyncWriterImpl.class); - public SyncWriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) { - super(topicRpc, settings, compressionExecutor); + public SyncWriterImpl(TopicRpc topicRpc, + WriterSettings settings, + Executor compressionExecutor, + @Nonnull CodecRegistry codecRegistry) { + super(topicRpc, settings, compressionExecutor, codecRegistry); } @Override diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java index 49a7e7ebd..90cecaeab 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java @@ -14,6 +14,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nonnull; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +26,7 @@ import tech.ydb.proto.topic.YdbTopic; import tech.ydb.topic.TopicRpc; import tech.ydb.topic.description.Codec; +import tech.ydb.topic.description.CodecRegistry; import tech.ydb.topic.impl.GrpcStreamRetrier; import tech.ydb.topic.settings.SendSettings; import tech.ydb.topic.settings.WriterSettings; @@ -59,13 +62,21 @@ public abstract class WriterImpl extends GrpcStreamRetrier { // Every writing stream has a sequential number (for debug purposes) private final AtomicLong sessionSeqNumberCounter = new AtomicLong(0); + /** + * Register for custom codec. User can specify custom codec which is local to TopicClient + */ + private final CodecRegistry codecRegistry; + private Boolean isSeqNoProvided = null; private int currentInFlightCount = 0; private long availableSizeBytes; // Future for flush method private CompletableFuture lastAcceptedMessageFuture; - public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressionExecutor) { + public WriterImpl(TopicRpc topicRpc, + WriterSettings settings, + Executor compressionExecutor, + @Nonnull CodecRegistry codecRegistry) { super(topicRpc.getScheduler(), settings.getErrorsHandler()); this.topicRpc = topicRpc; this.settings = settings; @@ -73,6 +84,7 @@ public WriterImpl(TopicRpc topicRpc, WriterSettings settings, Executor compressi this.availableSizeBytes = settings.getMaxSendBufferMemorySize(); this.maxSendBufferMemorySize = settings.getMaxSendBufferMemorySize(); this.compressionExecutor = compressionExecutor; + this.codecRegistry = codecRegistry; String message = "Writer" + " (generated id " + id + ")" + " created for topic \"" + settings.getTopicPath() + "\"" + @@ -180,7 +192,8 @@ private void encode(EnqueuedMessage message) { return; } try { - message.getMessage().setData(Encoder.encode(settings.getCodec(), message.getMessage().getData())); + message.getMessage().setData(Encoder.encode(settings.getCodec(), + message.getMessage().getData(), codecRegistry)); } catch (IOException exception) { throw new RuntimeException("Couldn't encode a message", exception); } diff --git a/topic/src/test/java/tech/ydb/topic/impl/CodecRegistryTest.java b/topic/src/test/java/tech/ydb/topic/impl/CodecRegistryTest.java new file mode 100644 index 000000000..35e0baa77 --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/CodecRegistryTest.java @@ -0,0 +1,90 @@ +package tech.ydb.topic.impl; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import tech.ydb.topic.description.Codec; +import tech.ydb.topic.description.CodecRegistry; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Unit tests for check simple logic for register custom codec + * + * @author Evgeny Kuvardin + */ +public class CodecRegistryTest { + CodecRegistry registry; + + private static final int codecId = 10113; + + @Before + public void beforeTest() { + registry = new CodecRegistry(); + } + + @Test + public void registerCustomCodecShouldDoubleRegisterCodecAndReturnLastCodec() { + Codec codec1 = new CodecTopic(); + Codec codec2 = new CodecTopic(); + + registry.registerCodec(codec1); + Assert.assertEquals(codec1, registry.registerCodec(codec2)); + + Assert.assertEquals(codec2, registry.getCodec(codecId)); + Assert.assertNotEquals(codec1, registry.getCodec(codecId)); + } + + @Test + public void registerCustomCodecShouldNotAcceptNull() { + Assert.assertThrows( + AssertionError.class, + () -> registry.registerCodec(null)); + } + + @Test + public void registerCustomCodecShouldRegisterAndOverrideAnyCodec() { + CodecTopic codec1 = new CodecTopic(); + expectRegisterCodec(1, codec1, RawCodec.getInstance()); + expectRegisterCodec(2, codec1, GzipCodec.getInstance()); + expectRegisterCodec(3, codec1, LzopCodec.getInstance()); + expectRegisterCodec(4, codec1, ZstdCodec.getInstance()); + } + + void expectRegisterCodec(int codecId, CodecTopic newCodec, Codec oldCodec) { + newCodec.setCodecId(codecId); + Codec codecOldPredefined = registry.registerCodec(newCodec); + Assert.assertSame(codecOldPredefined, oldCodec); + } + + static class CodecTopic implements Codec { + + int codec; + + public CodecTopic() { + this.codec = codecId; + } + + public void setCodecId(int codecId) { + this.codec = codecId; + } + + @Override + public int getId() { + return codec; + } + + @Override + public InputStream decode(InputStream byteArrayInputStream) throws IOException { + return null; + } + + @Override + public OutputStream encode(OutputStream byteArrayOutputStream) throws IOException { + return null; + } + } +} diff --git a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsCodecIntegrationTest.java b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsCodecIntegrationTest.java new file mode 100644 index 000000000..af232260a --- /dev/null +++ b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsCodecIntegrationTest.java @@ -0,0 +1,668 @@ +package tech.ydb.topic.impl; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import tech.ydb.core.Status; +import tech.ydb.test.junit4.GrpcTransportRule; +import tech.ydb.topic.TopicClient; +import tech.ydb.topic.description.Codec; +import tech.ydb.topic.description.Consumer; +import tech.ydb.topic.read.DecompressionException; +import tech.ydb.topic.read.SyncReader; +import tech.ydb.topic.settings.CreateTopicSettings; +import tech.ydb.topic.settings.ReaderSettings; +import tech.ydb.topic.settings.TopicReadSettings; +import tech.ydb.topic.settings.WriterSettings; +import tech.ydb.topic.write.Message; +import tech.ydb.topic.write.SyncWriter; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + + +/** + * Test connecting to read write using all available codec + * + * @author Evgeny Kuvardin + */ +public class YdbTopicsCodecIntegrationTest { + private final static Logger logger = LoggerFactory.getLogger(YdbTopicsCodecIntegrationTest.class); + + @ClassRule + public final static GrpcTransportRule ydbTransport = new GrpcTransportRule(); + + private final static String TEST_TOPIC1 = "integration_test_custom_codec_topic1"; + private final static String TEST_TOPIC2 = "integration_test_custom_codec_topic2"; + private final static String TEST_CONSUMER1 = "consumer_codec"; + private final static String TEST_CONSUMER2 = "other_consumer_codec"; + + private final List topicToDelete = new ArrayList<>(); + private final List clientToClose = new ArrayList<>(); + + TopicClient client1; + + private final static String[] TEST_MESSAGES = new String[]{ + "Test message", + "", + " ", + "Other message", + "Last message", + }; + + Map> queueOfMessages = new HashMap<>(); + + @Before + public void beforeEachTest() { + topicToDelete.clear(); + clientToClose.clear(); + } + + @After + public void afterEachTest() { + for (String s : topicToDelete) { + deleteTopic(s); + } + + for (TopicClient topicClient : clientToClose) { + topicClient.close(); + } + + queueOfMessages.clear(); + } + + /** + * Ability to use custom codec with write and read + * This positive test checks that we can read and write in one topic + *

+ * STEPS + * 1. Create client + * 2. Create topic TEST_TOPIC1 + * 3. Create custom codec + * 4. Register codec with id = 10113 and CustomTopicCodec + * 5. Write data to topic with codec = 10113 + * 6. Read data from topic without errors + * + */ + @Test + public void writeDataAndReadDataWithCustomCodec() throws InterruptedException, ExecutionException, TimeoutException { + client1 = createClient(); + createTopic(client1, TEST_TOPIC1); + + Codec codec = new CustomCodec(1, 10113); + + client1.registerCodec(codec); + + writeData(10113, TEST_TOPIC1, client1); + + readData(TEST_TOPIC1, client1); + } + + /** + * This test checks that in one client we can make arbitrary codecs which don't disturb each other + *

+ * STEPS + * 1. Create client + * 2.1. Create topic TEST_TOPIC1 + * 2.2. Create topic TEST_TOPIC2 + * 3.1. Create custom codec1 + * 3.2. Create custom codec2 + * 4.1. Register codec with id = 10113 and codec1 + * 4.2. Register codec with id = 10114 and codec2 + * 5.1. Write data to TEST_TOPIC1 with codec = 10113 + * 5.1. Write data to TEST_TOPIC2 with codec = 10114 + * 6.1. Read data from TEST_TOPIC1 without errors + * 6.1. Read data from TEST_TOPIC2 without errors + * + */ + @Test + public void writeInTwoTopicsInOneClientWithDifferentCustomCodec() throws ExecutionException, InterruptedException, TimeoutException { + client1 = createClient(); + + createTopic(client1, TEST_TOPIC1); + createTopic(client1, TEST_TOPIC2); + + Codec codec1 = new CustomCodec(1, 10113); + Codec codec2 = new CustomCodec(7, 10114); + + client1.registerCodec(codec1); + client1.registerCodec(codec2); + + writeData(10113, TEST_TOPIC1, client1); + writeData(10114, TEST_TOPIC2, client1); + + readData(TEST_TOPIC1, client1); + readData(TEST_TOPIC2, client1); + } + + /** + * This test checks that different client don't exchange CodecRegistry with each other + *

+ * STEPS + * 1.1. Create client1 + * 1.2. Create client2 + * 2.1. Create topic TEST_TOPIC1 in client1 + * 2.2. Create topic TEST_TOPIC2 in client2 + * 3.1. Create custom codec1 + * 3.2. Create custom codec2 + * 4.1. Register codec with id = 10113 and codec1 + * 4.2. Register codec with id = 10113 and codec2 + * 5.1. Write data to TEST_TOPIC1 with codec = 10113 + * 5.1. Write data to TEST_TOPIC2 with codec = 10113 + * 6.1. Read data from TEST_TOPIC1 without errors + * 6.1. Read data from TEST_TOPIC2 without errors + * + */ + @Test + public void writeInTwoTopicWithDifferentCodecWithOneIdShouldNotFailed() throws ExecutionException, InterruptedException, TimeoutException { + client1 = createClient(); + TopicClient client2 = createClient(); + + createTopic(client1, TEST_TOPIC1); + createTopic(client2, TEST_TOPIC2); + + Codec codec1 = new CustomCodec(1, 10113); + Codec codec2 = new CustomCodec(7, 10113); + + client1.registerCodec(codec1); + client2.registerCodec(codec2); + + writeData(10113, TEST_TOPIC1, client1); + writeData(10113, TEST_TOPIC2, client2); + + readData(TEST_TOPIC1, client1); + readData(TEST_TOPIC2, client2); + + client2.close(); + } + + /** + * This test checks that overwrite existing codec with not backward compatibility will give an error + *

+ * STEPS + * 1. Create client1 + * 2. Create topic TEST_TOPIC1 in client1 + * 3.1. Create custom codec1 + * 3.2. Create custom codec2 + * 4. Register codec with id = 10113 and codec1 + * 5. Write data to TEST_TOPIC1 with codec = 10113 + * 6. Register codec with id = 10113 and codec2 + * 7. Read data from TEST_TOPIC1 with errors + * + */ + @Test + public void readUsingWrongCodec() throws ExecutionException, InterruptedException, TimeoutException { + client1 = createClient(); + createTopic(client1, TEST_TOPIC1); + + Codec codec1 = new CustomCodec(1,10113); + Codec codec2 = new CustomCodec(7, 10113); + + client1.registerCodec(codec1); + + writeData(10113, TEST_TOPIC1, client1); + + client1.registerCodec(codec2); + + readDataFail(TEST_TOPIC1, client1); + } + + + /** + * Test checks that we can write in one TopicUsing differentCodecs + *

+ * STEPS + * 1. Create client1 + * 2. Create topic TEST_TOPIC1 in client1 + * 3.1. Create custom codec1 + * 3.2. Create custom codec2 + * 4.1. Register codec with id = 10113 and codec1 + * 4.2. Register codec with id = 10113 and codec2 + * 7. Write data with codec 1, 10014, 2, 4, 10113, 3 + * 8. Read data without errors + * + */ + @Test + public void writeInOneTopicWithDifferentCodec() throws ExecutionException, InterruptedException, TimeoutException { + client1 = createClient(); + createTopic(client1, TEST_TOPIC1); + + Codec codec1 = new CustomCodec(1, 10113); + Codec codec2 = new CustomCodec(7 , 10114); + + client1.registerCodec(codec1); + client1.registerCodec(codec2); + + writeData(Codec.RAW, TEST_TOPIC1, client1); + writeData(10114, TEST_TOPIC1, client1); + writeData(Codec.GZIP, TEST_TOPIC1, client1); + writeData(Codec.ZSTD, TEST_TOPIC1, client1); + writeData(10113, TEST_TOPIC1, client1); + writeData(Codec.LZOP, TEST_TOPIC1, client1); + + readData(TEST_TOPIC1, client1); + } + + /** + * In this test we verify that decode failed when code not found but after specify correct codec + * Messages reads again and will be decoded + *

+ * 1. Create client1 and client2 + * 2. Create topic TEST_TOPIC1 in client1 + * 3. Create custom codec1 + * 4. Register codec with id = 10113 and codec1 + * 5. Write data with codec 10113 + * 6. Read data with errors in client2 + * 7 Once again register codec with id = 10113 and codec1 + * 8. Read data without errors + * + */ + @Test + public void readShouldFailIfWithNotRegisteredCodec() throws ExecutionException, InterruptedException, TimeoutException { + client1 = createClient(); + TopicClient client2 = createClient(); + createTopic(client1, TEST_TOPIC1); + + Codec codec1 = new CustomCodec(1, 10113); + + client1.registerCodec(codec1); + writeData(10113, TEST_TOPIC1, client1); + + readDataWithError(TEST_TOPIC1, client2); + + client2.registerCodec(codec1); + readData(TEST_TOPIC1, client2); + } + + /** + * Test checks that we can't write into topic with unknown codec + *

+ * 1. Create client1 + * 2. Create topic TEST_TOPIC1 in client1 + * 3. Try to write with reserved codec 7 -> get error + * 4. Try to write with reserved codec 10000 -> get error + * 5. Try to write with custom unregister codec 20000 -> get error + */ + @Ignore + @Test + public void writeWithReservedNotExistedCodec() { + client1 = createClient(); + createTopic(client1, TEST_TOPIC1); + + Exception e = Assert.assertThrows(RuntimeException.class, () -> writeData(7, TEST_TOPIC1, client1)); + Assert.assertTrue(e.getMessage().contains("Unsupported codec: " + 7)); + } + + /** + * Create one more defect. Test failed for unknown reason. Seems RuntimeException produce some weird behaviour + */ + @Ignore + @Test + public void writeWithCustomCodec10000() { + client1 = createClient(); + createTopic(client1, TEST_TOPIC1); + + Exception e = Assert.assertThrows(Exception.class, () -> writeData(10000, TEST_TOPIC1, client1)); + Assert.assertEquals("Unsupported codec: " + 10000, e.getCause().getMessage()); + } + + /** + * Test checks that we can write and read using RAW Codec + *

+ * 1. Create client1 + * 2. Create topic TEST_TOPIC1 in client1 + * 3. Try to write + * 4. Read data + */ + @Test + public void readWriteRawCodec() throws ExecutionException, InterruptedException, TimeoutException { + client1 = createClient(); + createTopic(client1, TEST_TOPIC1); + + writeData(Codec.RAW, TEST_TOPIC1, client1); + + readData(TEST_TOPIC1, client1); + } + + /** + * The test checks that we can rewrite the predefined RAW codec. + * Please note that modifying a RAW codec is highly unusual and potentially risky. + * You take full responsibility for any consequences that may result. + * The SDK includes mechanisms in some parts of the codec that attempt to optimize the code + * and detect write or read operations to RAW codecs. + *

+ * 1. Create client1 + * 2. Create topic TEST_TOPIC1 in client1 + * 3. Create custom codec + * 4, Register codec + * 5. Try to write + * 6. Read data + */ + @Test + public void userCanRewriteRawCodec() throws ExecutionException, InterruptedException, TimeoutException { + client1 = createClient(); + createTopic(client1, TEST_TOPIC1); + + Codec codec = new CustomCodec(0, Codec.RAW); + client1.registerCodec(codec); + + writeData(codec.getId(), TEST_TOPIC1, client1); + + readData(TEST_TOPIC1, client1); + } + + /** + * Test checks that we can write and read using GZIP Codec + *

+ * 1. Create client1 + * 2. Create topic TEST_TOPIC1 in client1 + * 3. Try to write + * 4. Read data + */ + @Test + public void readWriteGzipCodec() throws ExecutionException, InterruptedException, TimeoutException { + client1 = createClient(); + createTopic(client1, TEST_TOPIC1); + + writeData(Codec.GZIP, TEST_TOPIC1, client1); + + readData(TEST_TOPIC1, client1); + } + + /** + * The test checks that we can rewrite the predefined Gzip codec. + *

+ * 1. Create client1 + * 2. Create topic TEST_TOPIC1 in client1 + * 3. Create custom codec + * 4, Register codec + * 5. Try to write + * 6. Read data + */ + @Test + public void userCanRewriteGzipCodec() throws ExecutionException, InterruptedException, TimeoutException { + client1 = createClient(); + createTopic(client1, TEST_TOPIC1); + + Codec codec = new CustomCodec(2, Codec.GZIP); + client1.registerCodec(codec); + + writeData(codec.getId(), TEST_TOPIC1, client1); + + readData(TEST_TOPIC1, client1); + } + + /** + * Test checks that we can write and read using Lzop Codec + *

+ * 1. Create client1 + * 2. Create topic TEST_TOPIC1 in client1 + * 3. Try to write + * 4. Read data + */ + @Test + public void readWriteLzopCodec() throws ExecutionException, InterruptedException, TimeoutException { + client1 = createClient(); + createTopic(client1, TEST_TOPIC1); + + writeData(Codec.LZOP, TEST_TOPIC1, client1); + + readData(TEST_TOPIC1, client1); + } + + /** + * The test checks that we can rewrite the predefined Lzop codec. + *

+ * 1. Create client1 + * 2. Create topic TEST_TOPIC1 in client1 + * 3. Create custom codec + * 4, Register codec + * 5. Try to write + * 6. Read data + */ + @Test + public void userCanRewriteLzopCodec() throws ExecutionException, InterruptedException, TimeoutException { + client1 = createClient(); + createTopic(client1, TEST_TOPIC1); + + Codec codec = new CustomCodec(3, Codec.LZOP); + client1.registerCodec(codec); + + writeData(codec.getId(), TEST_TOPIC1, client1); + + readData(TEST_TOPIC1, client1); + } + + /** + * Test checks that we can write and read using Zstd Codec + *

+ * 1. Create client1 + * 2. Create topic TEST_TOPIC1 in client1 + * 3. Try to write + * 4. Read data + */ + @Test + public void readWriteZstdCodec() throws ExecutionException, InterruptedException, TimeoutException { + client1 = createClient(); + createTopic(client1, TEST_TOPIC1); + + writeData(Codec.ZSTD, TEST_TOPIC1, client1); + + readData(TEST_TOPIC1, client1); + } + + /** + * The test checks that we can rewrite the predefined Lzop codec. + *

+ * 1. Create client1 + * 2. Create topic TEST_TOPIC1 in client1 + * 3. Create custom codec + * 4, Register codec + * 5. Try to write + * 6. Read data + */ + @Test + public void userCanRewriteZstdCodec() throws ExecutionException, InterruptedException, TimeoutException { + client1 = createClient(); + createTopic(client1, TEST_TOPIC1); + + Codec codec = new CustomCodec(4, Codec.ZSTD); + client1.registerCodec(codec); + + writeData(codec.getId(), TEST_TOPIC1, client1); + + readData(TEST_TOPIC1, client1); + } + + private TopicClient createClient() { + TopicClient topicClient = TopicClient.newClient(ydbTransport).build(); + clientToClose.add(topicClient); + return topicClient; + } + + private void createTopic(TopicClient client, String topicName) { + logger.info("Create test topic {} ...", topicName); + + client.createTopic(topicName, CreateTopicSettings.newBuilder() + .addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER1).build()) + .addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER2).build()) + .build() + ).join().expectSuccess("can't create a new topic"); + + topicToDelete.add(topicName); + } + + private void deleteTopic(String topicName) { + logger.info("Drop test topic {} ...", topicName); + Status dropStatus = client1.dropTopic(topicName).join(); + client1.close(); + dropStatus.expectSuccess("can't drop test topic"); + } + + private void writeData(int codecId, String topicName, TopicClient client) throws ExecutionException, InterruptedException, TimeoutException { + byte[][] testMessages = new byte[][]{ + (TEST_MESSAGES[0] + codecId).getBytes(), + TEST_MESSAGES[1].getBytes(), + TEST_MESSAGES[2].getBytes(), + (TEST_MESSAGES[3] + codecId).getBytes(), + (TEST_MESSAGES[4] + codecId).getBytes(), + }; + + writeData(codecId, topicName, client, testMessages); + } + + private void writeData(int codecId, String topicName, TopicClient client, byte[][] testMessages) throws ExecutionException, InterruptedException, TimeoutException { + WriterSettings settings = WriterSettings.newBuilder() + .setTopicPath(topicName) + .setCodec(codecId) + .build(); + SyncWriter writer = client.createSyncWriter(settings); + writeData(writer, topicName, testMessages); + } + + private void writeData(SyncWriter writer, String topicName, byte[][] testMessages) throws ExecutionException, InterruptedException, TimeoutException { + writer.initAndWait(); + + Deque deque = queueOfMessages.computeIfAbsent(topicName, k -> new ArrayDeque<>()); + deque.add(testMessages); + + for (byte[] testMessage : testMessages) { + writer.send(Message.newBuilder().setData(testMessage).build()); + } + + writer.flush(); + writer.shutdown(1, TimeUnit.MINUTES); + } + + private void readData(String topicName, TopicClient client) throws InterruptedException { + ReaderSettings readerSettings = ReaderSettings.newBuilder() + .addTopic(TopicReadSettings.newBuilder().setPath(topicName).build()) + .setConsumerName(TEST_CONSUMER1) + .build(); + + SyncReader reader = client.createSyncReader(readerSettings); + reader.initAndWait(); + + while (!queueOfMessages.get(topicName).isEmpty()) { + byte[][] testMessages = queueOfMessages.get(topicName).poll(); + + Assert.assertNotNull(testMessages); + for (byte[] bytes : testMessages) { + tech.ydb.topic.read.Message msg = reader.receive(10, TimeUnit.SECONDS); + Assert.assertNotNull(msg); + Assert.assertArrayEquals(bytes, msg.getData()); + } + } + reader.shutdown(); + } + + private void readDataFail(String topicName, TopicClient client) throws InterruptedException { + ReaderSettings readerSettings = ReaderSettings.newBuilder() + .addTopic(TopicReadSettings.newBuilder().setPath(topicName).build()) + .setConsumerName(TEST_CONSUMER1) + .build(); + + SyncReader reader = client.createSyncReader(readerSettings); + reader.initAndWait(); + + while (!queueOfMessages.get(topicName).isEmpty()) { + byte[][] testMessages = queueOfMessages.get(topicName).poll(); + Assert.assertNotNull(testMessages); + for (byte[] bytes : testMessages) { + tech.ydb.topic.read.Message msg = reader.receive(1, TimeUnit.SECONDS); + if (bytes.length != 0 && // nothing to decode + msg != null) // uncatch error has happened and that is what we want + { + Assert.assertFalse(java.util.Arrays.equals(bytes, msg.getData())); + } + } + } + + reader.shutdown(); + } + + private void readDataWithError(String topicName, TopicClient client) throws InterruptedException { + ReaderSettings readerSettings = ReaderSettings.newBuilder() + .addTopic(TopicReadSettings.newBuilder().setPath(topicName).build()) + .setConsumerName(TEST_CONSUMER1) + .build(); + + SyncReader reader = client.createSyncReader(readerSettings); + reader.initAndWait(); + + while (!queueOfMessages.get(topicName).isEmpty()) { + byte[][] testMessages = queueOfMessages.get(topicName).poll(); + Assert.assertNotNull(testMessages); + for (byte[] bytes : testMessages) { + tech.ydb.topic.read.Message msg = reader.receive(1, TimeUnit.SECONDS); + if (bytes.length != 0 && // nothing to decode + msg != null) // uncatch error has happened and that is what we want + { + Assert.assertThrows(DecompressionException.class, msg::getData); + } + } + } + + reader.shutdown(); + } + + + static class CustomCodec implements Codec { + + final int stub; + final int codecId; + + public CustomCodec(int stub, int codecId) { + this.stub = stub; + this.codecId = codecId; + } + + @Override + public int getId() { + return codecId; + } + + @Override + public InputStream decode(InputStream inputStream) throws IOException { + return new InputStream() { + @Override + public int read() throws IOException { + for (int i = 0; i < stub; i++) { + inputStream.read(); + } + + return inputStream.read(); + } + }; + } + + @Override + public OutputStream encode(OutputStream byteArrayOutputStream) throws IOException { + return new OutputStream() { + @Override + public void write(int b) throws IOException { + for (int i = 0; i < stub; i++) { + byteArrayOutputStream.write(stub); + } + byteArrayOutputStream.write(b); + } + }; + } + } +}