Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions topic/src/main/java/tech/ydb/topic/TopicClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.topic.description.ConsumerDescription;
import tech.ydb.topic.description.CustomTopicCodec;
import tech.ydb.topic.description.TopicDescription;
import tech.ydb.topic.impl.GrpcTopicRpc;
import tech.ydb.topic.impl.TopicClientImpl;
Expand Down Expand Up @@ -164,6 +165,23 @@ default CompletableFuture<Result<ConsumerDescription>> describeConsumer(String p
@Override
void close();

/**
* Register custom codec implementation to TopicClient *
*
* @param codec - codec identifier (must be more than 10000)
* @param customTopicCodec - custom implementation
*/
void registerCodec(int codec, CustomTopicCodec customTopicCodec);


/**
* Unregister custom codec implementation
*
* @param codec - codec identifier (must be more than 10000)
* @return implementation was before
*/
CustomTopicCodec unregisterCodec(int codec);

/**
* BUILDER
*/
Expand Down
28 changes: 22 additions & 6 deletions topic/src/main/java/tech/ydb/topic/description/Codec.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,27 @@

/**
* @author Nikolay Perfilov
*
* List of reserved codecs
*/
public enum Codec {
RAW,
GZIP,
LZOP,
ZSTD,
CUSTOM;
public class Codec {
public static final int RAW = 1;
public static final int GZIP = 2;
public static final int LZOP = 3;
public static final int ZSTD = 4;
public static final int CUSTOM = 10000;

private static final Codec INSTANCE = new Codec();

private Codec() {
}

public static Codec getInstance() {
return INSTANCE;
}

public boolean isReserved(int codec) {
return codec <= CUSTOM;
}

}
32 changes: 32 additions & 0 deletions topic/src/main/java/tech/ydb/topic/description/CodecRegistry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package tech.ydb.topic.description;

/**
* Interface for register custom codec
*
* @author Evgeny Kuvardin
**/
public interface CodecRegistry {

/**
* Register codec implementation
* @param codec codec identifier
* @param customTopicCodec codec implementation
* @return previous implementation with associated codec
*/
CustomTopicCodec registerCustomCodec(int codec, CustomTopicCodec customTopicCodec);

/**
* Unregister codec implementation
* @param codec codec identifier
* @return previous implementation with associated codec
*/
CustomTopicCodec unregisterCustomCodec(int codec);

/**
* Get codec implementation by associated id
* @param codec codec identifier
* @return codec implementation
*/
CustomTopicCodec getCustomCodec(int codec);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package tech.ydb.topic.description;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Register for custom topic codec. Local to TopicClient
*
* @author Evgeny Kuvardin
**/
public class CodecRegistryImpl implements CodecRegistry {

/**
* Make customCodecMap concurrent since register/unregister/read can be from different threads
*/
final Map<Integer, CustomTopicCodec> customCodecMap;

public CodecRegistryImpl() {
customCodecMap = new ConcurrentHashMap<>();
}

@Override
public CustomTopicCodec registerCustomCodec(int codec, CustomTopicCodec customTopicCodec) {
assert customTopicCodec != null;

if (Codec.getInstance().isReserved(codec)) {
throw new RuntimeException(
"Create custom codec for reserved code not allowed: " + codec + " .Use code more than 10000");
}

return customCodecMap.put(codec, customTopicCodec);
}

@Override
public CustomTopicCodec unregisterCustomCodec(int codec) {
if (Codec.getInstance().isReserved(codec)) {
throw new RuntimeException(
"Create custom codec for reserved code not allowed: " + codec + " .Use code more than 10000");
}

return customCodecMap.remove(codec);
}

@Override
public CustomTopicCodec getCustomCodec(int codec) {
return customCodecMap.get(codec);
}
}
8 changes: 4 additions & 4 deletions topic/src/main/java/tech/ydb/topic/description/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class Consumer {
private final String name;
private final boolean important;
private final Instant readFrom;
private final List<Codec> supportedCodecs;
private final List<Integer> supportedCodecs;
private final Map<String, String> attributes;
private final ConsumerStats stats;

Expand Down Expand Up @@ -68,7 +68,7 @@ public SupportedCodecs getSupportedCodecs() {
return new SupportedCodecs(supportedCodecs);
}

public List<Codec> getSupportedCodecsList() {
public List<Integer> getSupportedCodecsList() {
return supportedCodecs;
}

Expand All @@ -88,7 +88,7 @@ public static class Builder {
private String name;
private boolean important = false;
private Instant readFrom = null;
private List<Codec> supportedCodecs = new ArrayList<>();
private List<Integer> supportedCodecs = new ArrayList<>();
private Map<String, String> attributes = new HashMap<>();
private ConsumerStats stats = null;

Expand All @@ -107,7 +107,7 @@ public Builder setReadFrom(Instant readFrom) {
return this;
}

public Builder addSupportedCodec(Codec codec) {
public Builder addSupportedCodec(int codec) {
this.supportedCodecs.add(codec);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package tech.ydb.topic.description;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* Interface for custom codec implementation.
* <p>
* You can use custom codec as below
* 1. Implement interface methods
* 2. Use in write data
* CustomTopicCodec customCodecImpl = ....
* Topic client = TopicClient.newClient(ydbTransport).build();
* <p>
* client.registerCodec(10113, customCodecImpl);
* WriterSettings settings = WriterSettings.newBuilder()
* .setTopicPath(topicName)
* .setCodec(codecId)
* .build();
* <p>
* SyncWriter writer = client.createSyncWriter(settings);
* <p>
* 3. Use in read data
* CustomTopicCodec customCodecImpl = ....
* Topic client = TopicClient.newClient(ydbTransport).build();
* <p>
* ReaderSettings readerSettings = ReaderSettings.newBuilder()
* .addTopic(TopicReadSettings.newBuilder().setPath(topicName).build())
* .setConsumerName(TEST_CONSUMER1)
* .build();
* <p>
* SyncReader reader = client.createSyncReader(readerSettings);
*
*/
public interface CustomTopicCodec {

/**
* Decode data
*
* @param byteArrayOutputStream input stream
* @return output stream
* @throws IOException throws when error occurs
*/
InputStream decode(ByteArrayInputStream byteArrayOutputStream) throws IOException;

/**
* Encode data
*
* @param byteArrayInputStream input stream
* @return output stream
* @throws IOException throws when error occurs
*/
OutputStream encode(ByteArrayOutputStream byteArrayInputStream) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
* @author Nikolay Perfilov
*/
public class SupportedCodecs {
private final List<Codec> codecs;
private final List<Integer> codecs;

public SupportedCodecs(Builder builder) {
this.codecs = ImmutableList.copyOf(builder.codecs);
}

public SupportedCodecs(List<Codec> codecs) {
public SupportedCodecs(List<Integer> codecs) {
this.codecs = codecs;
}

public List<Codec> getCodecs() {
public List<Integer> getCodecs() {
return codecs;
}

Expand All @@ -31,14 +31,14 @@ public static Builder newBuilder() {
* BUILDER
*/
public static class Builder {
private List<Codec> codecs = new ArrayList<>();
private List<Integer> codecs = new ArrayList<>();

public Builder addCodec(Codec codec) {
public Builder addCodec(int codec) {
codecs.add(codec);
return this;
}

public Builder setCodecs(List<Codec> supportedCodecs) {
public Builder setCodecs(List<Integer> supportedCodecs) {
this.codecs = supportedCodecs;
return this;
}
Expand Down
52 changes: 52 additions & 0 deletions topic/src/main/java/tech/ydb/topic/impl/CodecRegistryImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package tech.ydb.topic.impl;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import tech.ydb.topic.description.Codec;
import tech.ydb.topic.description.CodecRegistry;
import tech.ydb.topic.description.CustomTopicCodec;

/**
* Register for custom topic codec. Local to TopicClient
*
* @author Evgeny Kuvardin
**/
public class CodecRegistryImpl implements CodecRegistry {

/**
* Make customCodecMap concurrent since register/unregister/read can be from different threads
*/
final Map<Integer, CustomTopicCodec> customCodecMap;

public CodecRegistryImpl() {
customCodecMap = new ConcurrentHashMap<>();
}

@Override
public CustomTopicCodec registerCustomCodec(int codec, CustomTopicCodec customTopicCodec) {
assert customTopicCodec != null;

if (Codec.getInstance().isReserved(codec)) {
throw new RuntimeException(
"Create custom codec for reserved code not allowed: " + codec + " .Use code more than 10000");
}

return customCodecMap.put(codec, customTopicCodec);
}

@Override
public CustomTopicCodec unregisterCustomCodec(int codec) {
if (Codec.getInstance().isReserved(codec)) {
throw new RuntimeException(
"Create custom codec for reserved code not allowed: " + codec + " .Use code more than 10000");
}

return customCodecMap.remove(codec);
}

@Override
public CustomTopicCodec getCustomCodec(int codec) {
return customCodecMap.get(codec);
}
}
Loading
Loading