Skip to content

Commit 40760ca

Browse files
committed
Reduced duplicate code
1 parent c5cb67d commit 40760ca

File tree

3 files changed

+12
-13
lines changed

3 files changed

+12
-13
lines changed

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/CheckpointSerde.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ protected Schema getKeySchema() {
4444
}
4545

4646
@Override
47-
protected Optional<Schema> getValueSchema(short version) {
47+
protected Optional<Schema> getVersionedValueSchema(short version) {
4848
if (version == 0) {
4949
return Optional.of(VALUE_SCHEMA_V0);
5050
} else {

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/HeartbeatSerde.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ protected Schema getKeySchema() {
3636
return KEY_SCHEMA;
3737
}
3838

39-
protected Optional<Schema> getValueSchema(short version) {
39+
@Override
40+
protected Optional<Schema> getVersionedValueSchema(short version) {
4041
if (version == 0) {
4142
return Optional.of(VALUE_SCHEMA_V0);
4243
} else {

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/MirrorMakerSerde.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,29 +45,27 @@ public Serde.Serializer serializer(String topic, Serde.Target type) {
4545
@Override
4646
public Deserializer deserializer(String topic, Target target) {
4747
return (recordHeaders, bytes) ->
48-
switch (target) {
48+
new DeserializeResult(toJson(switch (target) {
4949
case KEY -> deserializeKey(bytes);
5050
case VALUE -> deserializeValue(bytes);
51-
};
51+
}), DeserializeResult.Type.JSON, Map.of());
5252
}
5353

54-
protected DeserializeResult deserializeKey(byte[] bytes) {
55-
Struct keyStruct = getKeySchema().read(ByteBuffer.wrap(bytes));
56-
return new DeserializeResult(toJson(keyStruct), DeserializeResult.Type.JSON, Map.of());
54+
protected Struct deserializeKey(byte[] bytes) {
55+
return getKeySchema().read(ByteBuffer.wrap(bytes));
5756
}
5857

59-
protected DeserializeResult deserializeValue(byte[] bytes) {
58+
protected Struct deserializeValue(byte[] bytes) {
6059
ByteBuffer wrap = ByteBuffer.wrap(bytes);
6160
Optional<Schema> valueSchema;
6261
if (versioned) {
6362
short version = wrap.getShort();
64-
valueSchema = getValueSchema(version);
63+
valueSchema = getVersionedValueSchema(version);
6564
} else {
6665
valueSchema = getValueSchema();
6766
}
6867
if (valueSchema.isPresent()) {
69-
Struct valueStruct = valueSchema.get().read(wrap);
70-
return new DeserializeResult(toJson(valueStruct), DeserializeResult.Type.JSON, Map.of());
68+
return valueSchema.get().read(wrap);
7169
} else {
7270
throw new IllegalStateException("Value schema was not present");
7371
}
@@ -79,8 +77,8 @@ protected Optional<Schema> getValueSchema() {
7977
return Optional.empty();
8078
}
8179

82-
protected Optional<Schema> getValueSchema(short version) {
83-
return Optional.empty();
80+
protected Optional<Schema> getVersionedValueSchema(short version) {
81+
throw new UnsupportedOperationException("Versioned value schema is not supported");
8482
}
8583

8684
}

0 commit comments

Comments
 (0)