Skip to content

Commit 5ffb0db

Browse files
committed
Change code due to CR
1 parent ce0160f commit 5ffb0db

23 files changed

+398
-834
lines changed

topic/src/main/java/tech/ydb/topic/TopicClient.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import tech.ydb.core.Result;
99
import tech.ydb.core.Status;
1010
import tech.ydb.core.grpc.GrpcTransport;
11+
import tech.ydb.topic.description.Codec;
1112
import tech.ydb.topic.description.ConsumerDescription;
12-
import tech.ydb.topic.description.CustomTopicCodec;
1313
import tech.ydb.topic.description.TopicDescription;
1414
import tech.ydb.topic.impl.GrpcTopicRpc;
1515
import tech.ydb.topic.impl.TopicClientImpl;
@@ -168,20 +168,11 @@ default CompletableFuture<Result<ConsumerDescription>> describeConsumer(String p
168168
/**
169169
* Register custom codec implementation to TopicClient *
170170
*
171-
* @param codec - codec identifier (must be more than 10000)
172-
* @param customTopicCodec - custom implementation
171+
* @param codec - custom implementation
173172
*/
174-
void registerCodec(int codec, CustomTopicCodec customTopicCodec);
173+
void registerCodec(Codec codec);
175174

176175

177-
/**
178-
* Unregister custom codec implementation
179-
*
180-
* @param codec - codec identifier (must be more than 10000)
181-
* @return implementation was before
182-
*/
183-
CustomTopicCodec unregisterCodec(int codec);
184-
185176
/**
186177
* BUILDER
187178
*/
Lines changed: 72 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,83 @@
11
package tech.ydb.topic.description;
22

3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
6+
37
/**
4-
* @author Nikolay Perfilov
8+
9+
*
10+
* Interface for custom codec implementation.
11+
* <p>
512
*
6-
* List of reserved codecs
13+
* You can use custom codec as below
14+
* 1. Implement interface methods
15+
* Specify getId which return value more than 10000. This value identify codec across others
16+
* 2. Use code below to write data
17+
* Codec codecImpl = ....
18+
* Topic client = TopicClient.newClient(ydbTransport).build();
19+
* <p>
20+
* client.registerCodec(codecImpl);
21+
* WriterSettings settings = WriterSettings.newBuilder()
22+
* .setTopicPath(topicName)
23+
* .setCodec(codecId)
24+
* .build();
25+
* <p>
26+
* SyncWriter writer = client.createSyncWriter(settings);
27+
* <p>
28+
* 3. Use to read data. Codec should be registered in {@link CodecRegistry}
29+
* Codec codecImpl = ....
30+
* Topic client = TopicClient.newClient(ydbTransport).build();
31+
* <p>
32+
* ReaderSettings readerSettings = ReaderSettings.newBuilder()
33+
* .addTopic(TopicReadSettings.newBuilder().setPath(topicName).build())
34+
* .setConsumerName(TEST_CONSUMER1)
35+
* .build();
36+
* <p>
37+
* SyncReader reader = client.createSyncReader(readerSettings);
38+
*
39+
* @author Nikolay Perfilov
740
*/
8-
public class Codec {
9-
public static final int RAW = 1;
10-
public static final int GZIP = 2;
11-
public static final int LZOP = 3;
12-
public static final int ZSTD = 4;
13-
public static final int CUSTOM = 10000;
14-
15-
private static final Codec INSTANCE = new Codec();
41+
public interface Codec {
42+
int RAW = 1;
43+
int GZIP = 2;
44+
int LZOP = 3;
45+
int ZSTD = 4;
46+
int CUSTOM = 10000;
1647

17-
private Codec() {
48+
/**
49+
* Check is codec is reserved
50+
*
51+
* @param codec codec id
52+
* @return true - codec id is reserved; false - elsewhere
53+
*/
54+
default boolean isReserved(int codec) {
55+
return codec <= CUSTOM;
1856
}
1957

20-
public static Codec getInstance() {
21-
return INSTANCE;
22-
}
58+
/**
59+
* Get codec identifier
60+
* @return codec identifier
61+
*/
62+
int getId();
2363

24-
public boolean isReserved(int codec) {
25-
return codec <= CUSTOM;
26-
}
64+
/**
65+
* Decode data
66+
*
67+
* @param byteArrayInputStream input stream
68+
* @return output stream
69+
* @throws IOException throws when error occurs
70+
*/
71+
72+
InputStream decode(InputStream byteArrayInputStream) throws IOException;
73+
74+
/**
75+
* Encode data
76+
*
77+
* @param byteArrayOutputStream output stream
78+
* @return output stream
79+
* @throws IOException throws when error occurs
80+
*/
81+
OutputStream encode(OutputStream byteArrayOutputStream) throws IOException;
2782

2883
}
Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,67 @@
11
package tech.ydb.topic.description;
22

3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import tech.ydb.topic.impl.GzipCodec;
10+
import tech.ydb.topic.impl.LzopCodec;
11+
import tech.ydb.topic.impl.RawCodec;
12+
import tech.ydb.topic.impl.ZstdCodec;
13+
314
/**
4-
* Interface for register custom codec
15+
* Register for custom topic codec. Local to TopicClient
516
*
617
* @author Evgeny Kuvardin
718
**/
8-
public interface CodecRegistry {
19+
public class CodecRegistry {
20+
21+
private static final Logger logger = LoggerFactory.getLogger(CodecRegistry.class);
22+
23+
final Map<Integer, Codec> customCodecMap;
24+
25+
public CodecRegistry() {
26+
customCodecMap = new HashMap<>();
27+
customCodecMap.put(Codec.RAW, RawCodec.getInstance());
28+
customCodecMap.put(Codec.GZIP, GzipCodec.getInstance());
29+
customCodecMap.put(Codec.LZOP, LzopCodec.getInstance());
30+
customCodecMap.put(Codec.ZSTD, ZstdCodec.getInstance());
31+
}
932

1033
/**
1134
* Register codec implementation
12-
* @param codec codec identifier
13-
* @param customTopicCodec codec implementation
35+
* @param codec codec implementation
1436
* @return previous implementation with associated codec
1537
*/
16-
CustomTopicCodec registerCustomCodec(int codec, CustomTopicCodec customTopicCodec);
38+
public Codec registerCodec(Codec codec) {
39+
assert codec != null;
40+
int codecId = codec.getId();
1741

18-
/**
19-
* Unregister codec implementation
20-
* @param codec codec identifier
21-
* @return previous implementation with associated codec
22-
*/
23-
CustomTopicCodec unregisterCustomCodec(int codec);
42+
if (codec.isReserved(codecId)) {
43+
throw new RuntimeException(
44+
"Create custom codec for reserved code not allowed: " + codec + " .Use code more than 10000");
45+
}
46+
Codec result = customCodecMap.put(codecId, codec);
47+
48+
if (result != null) {
49+
logger.info(
50+
"Replace codec which have already associated with this id. CodecId: {} Codec: {}",
51+
codecId,
52+
result);
53+
}
54+
55+
return result;
56+
}
2457

2558
/**
2659
* Get codec implementation by associated id
27-
* @param codec codec identifier
60+
* @param codecId codec identifier
2861
* @return codec implementation
2962
*/
30-
CustomTopicCodec getCustomCodec(int codec);
63+
public Codec getCodec(int codecId) {
64+
return customCodecMap.get(codecId);
65+
}
3166

3267
}

topic/src/main/java/tech/ydb/topic/description/CodecRegistryImpl.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

topic/src/main/java/tech/ydb/topic/description/CustomTopicCodec.java

Lines changed: 0 additions & 57 deletions
This file was deleted.

topic/src/main/java/tech/ydb/topic/impl/CodecRegistryImpl.java

Lines changed: 0 additions & 52 deletions
This file was deleted.

0 commit comments

Comments
 (0)