Skip to content

Commit 7e25c7c

Browse files
committed
Reduced duplicate code
1 parent 9c987ef commit 7e25c7c

File tree

4 files changed

+121
-162
lines changed

4 files changed

+121
-162
lines changed
Lines changed: 13 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
package io.kafbat.ui.serdes.builtin.mm2;
22

3-
import io.kafbat.ui.serde.api.DeserializeResult;
4-
import io.kafbat.ui.serde.api.SchemaDescription;
53
import io.kafbat.ui.serdes.BuiltInSerde;
6-
import java.nio.ByteBuffer;
7-
import java.util.Map;
84
import java.util.Optional;
5+
import java.util.OptionalInt;
96
import java.util.regex.Pattern;
107
import lombok.extern.slf4j.Slf4j;
118
import org.apache.kafka.common.protocol.types.Field;
129
import org.apache.kafka.common.protocol.types.Schema;
13-
import org.apache.kafka.common.protocol.types.Struct;
1410
import org.apache.kafka.common.protocol.types.Type;
1511

1612
@Slf4j
@@ -24,7 +20,6 @@ public class CheckpointSerde extends MirrorMakerSerde implements BuiltInSerde {
2420
private static final String UPSTREAM_OFFSET_KEY = "upstreamOffset";
2521
private static final String DOWNSTREAM_OFFSET_KEY = "offset";
2622
private static final String METADATA_KEY = "metadata";
27-
private static final String VERSION_KEY = "version";
2823

2924
private static final Schema VALUE_SCHEMA_V0 = new Schema(
3025
new Field(UPSTREAM_OFFSET_KEY, Type.INT64),
@@ -36,65 +31,27 @@ public class CheckpointSerde extends MirrorMakerSerde implements BuiltInSerde {
3631
new Field(TOPIC_KEY, Type.STRING),
3732
new Field(PARTITION_KEY, Type.INT32));
3833

39-
private static final Schema HEADER_SCHEMA = new Schema(
40-
new Field(VERSION_KEY, Type.INT16));
34+
public CheckpointSerde() {
35+
super(true);
36+
}
4137

4238
public static String name() {
4339
return "Checkpoint";
4440
}
4541

4642
@Override
47-
public Optional<String> getDescription() {
48-
return Optional.empty();
49-
}
50-
51-
@Override
52-
public Optional<SchemaDescription> getSchema(String topic, Target type) {
53-
return Optional.empty();
54-
}
55-
56-
@Override
57-
public boolean canDeserialize(String topic, Target type) {
58-
return true;
59-
}
60-
61-
@Override
62-
public boolean canSerialize(String topic, Target type) {
63-
return false;
43+
protected Schema getKeySchema() {
44+
return KEY_SCHEMA;
6445
}
6546

6647
@Override
67-
public Serializer serializer(String topic, Target type) {
68-
throw new UnsupportedOperationException();
69-
}
70-
71-
@Override
72-
public Deserializer deserializer(String topic, Target target) {
73-
return (recordHeaders, bytes) ->
74-
switch (target) {
75-
case KEY -> deserializeKey(bytes);
76-
case VALUE -> deserializeValue(bytes);
77-
};
78-
}
79-
80-
private static DeserializeResult deserializeKey(byte[] bytes) {
81-
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
82-
return new DeserializeResult(toJson(keyStruct), DeserializeResult.Type.JSON, Map.of());
83-
}
84-
85-
private static DeserializeResult deserializeValue(byte[] bytes) {
86-
ByteBuffer value = ByteBuffer.wrap(bytes);
87-
Struct header = HEADER_SCHEMA.read(value);
88-
short version = header.getShort(VERSION_KEY);
89-
Schema valueSchema = valueSchema(version);
90-
Struct valueStruct = valueSchema.read(value);
91-
92-
return new DeserializeResult(toJson(valueStruct), DeserializeResult.Type.JSON, Map.of());
93-
}
94-
95-
private static Schema valueSchema(short version) {
96-
assert version == 0;
97-
return VALUE_SCHEMA_V0;
48+
protected Optional<Schema> getValueSchema(short version) {
49+
if (version == 0) {
50+
return Optional.of(VALUE_SCHEMA_V0);
51+
} else {
52+
log.warn("Unsupported version of CheckpointSerde: {}", version);
53+
return Optional.empty();
54+
}
9855
}
9956

10057
}
Lines changed: 12 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
11
package io.kafbat.ui.serdes.builtin.mm2;
22

3-
import io.kafbat.ui.serde.api.DeserializeResult;
4-
import io.kafbat.ui.serde.api.SchemaDescription;
53
import io.kafbat.ui.serdes.BuiltInSerde;
6-
import java.nio.ByteBuffer;
7-
import java.util.Map;
84
import java.util.Optional;
95
import java.util.regex.Pattern;
106
import lombok.extern.slf4j.Slf4j;
117
import org.apache.kafka.common.protocol.types.Field;
128
import org.apache.kafka.common.protocol.types.Schema;
13-
import org.apache.kafka.common.protocol.types.Struct;
149
import org.apache.kafka.common.protocol.types.Type;
1510

1611
@Slf4j
@@ -21,7 +16,6 @@ public class HeartbeatSerde extends MirrorMakerSerde implements BuiltInSerde {
2116
private static final String SOURCE_CLUSTER_ALIAS_KEY = "sourceClusterAlias";
2217
private static final String TARGET_CLUSTER_ALIAS_KEY = "targetClusterAlias";
2318
private static final String TIMESTAMP_KEY = "timestamp";
24-
private static final String VERSION_KEY = "version";
2519

2620
private static final Schema VALUE_SCHEMA_V0 = new Schema(
2721
new Field(TIMESTAMP_KEY, Type.INT64));
@@ -30,64 +24,24 @@ public class HeartbeatSerde extends MirrorMakerSerde implements BuiltInSerde {
3024
new Field(SOURCE_CLUSTER_ALIAS_KEY, Type.STRING),
3125
new Field(TARGET_CLUSTER_ALIAS_KEY, Type.STRING));
3226

33-
private static final Schema HEADER_SCHEMA = new Schema(
34-
new Field(VERSION_KEY, Type.INT16));
27+
public HeartbeatSerde() {
28+
super(true);
29+
}
3530

3631
public static String name() {
3732
return "Heartbeat";
3833
}
3934

40-
@Override
41-
public Optional<String> getDescription() {
42-
return Optional.empty();
43-
}
44-
45-
@Override
46-
public Optional<SchemaDescription> getSchema(String topic, Target type) {
47-
return Optional.empty();
48-
}
49-
50-
@Override
51-
public boolean canDeserialize(String topic, Target type) {
52-
return true;
53-
}
54-
55-
@Override
56-
public boolean canSerialize(String topic, Target type) {
57-
return false;
58-
}
59-
60-
@Override
61-
public Serializer serializer(String topic, Target type) {
62-
throw new UnsupportedOperationException();
63-
}
64-
65-
@Override
66-
public Deserializer deserializer(String topic, Target target) {
67-
return (recordHeaders, bytes) ->
68-
switch (target) {
69-
case KEY -> deserializeKey(bytes);
70-
case VALUE -> deserializeValue(bytes);
71-
};
72-
}
73-
74-
private static DeserializeResult deserializeKey(byte[] bytes) {
75-
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
76-
77-
return new DeserializeResult(toJson(keyStruct), DeserializeResult.Type.JSON, Map.of());
78-
}
79-
80-
private static DeserializeResult deserializeValue(byte[] bytes) {
81-
ByteBuffer value = ByteBuffer.wrap(bytes);
82-
Struct headerStruct = HEADER_SCHEMA.read(value);
83-
short version = headerStruct.getShort(VERSION_KEY);
84-
Struct valueStruct = valueSchema(version).read(value);
85-
long timestamp = valueStruct.getLong(TIMESTAMP_KEY);
86-
return new DeserializeResult(String.valueOf(timestamp), DeserializeResult.Type.STRING, Map.of());
35+
protected Schema getKeySchema() {
36+
return KEY_SCHEMA;
8737
}
8838

89-
private static Schema valueSchema(short version) {
90-
assert version == 0;
91-
return VALUE_SCHEMA_V0;
39+
protected Optional<Schema> getValueSchema(short version) {
40+
if (version == 0) {
41+
return Optional.of(VALUE_SCHEMA_V0);
42+
} else {
43+
log.warn("Unsupported version of HeartbeatSerde: {}", version);
44+
return Optional.empty();
45+
}
9246
}
9347
}

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/MirrorMakerSerde.java

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,22 @@
55
import com.fasterxml.jackson.databind.SerializerProvider;
66
import com.fasterxml.jackson.databind.json.JsonMapper;
77
import com.fasterxml.jackson.databind.module.SimpleModule;
8+
import io.kafbat.ui.serde.api.DeserializeResult;
9+
import io.kafbat.ui.serde.api.SchemaDescription;
10+
import io.kafbat.ui.serde.api.Serde;
11+
import io.kafbat.ui.serdes.BuiltInSerde;
812
import java.io.IOException;
13+
import java.nio.ByteBuffer;
14+
import java.util.Map;
15+
import java.util.Optional;
16+
import lombok.RequiredArgsConstructor;
917
import lombok.SneakyThrows;
1018
import org.apache.kafka.common.protocol.types.BoundField;
19+
import org.apache.kafka.common.protocol.types.Schema;
1120
import org.apache.kafka.common.protocol.types.Struct;
1221

13-
abstract class MirrorMakerSerde {
22+
@RequiredArgsConstructor
23+
abstract class MirrorMakerSerde implements BuiltInSerde {
1424

1525
protected static final JsonMapper JSON_MAPPER = createMapper();
1626

@@ -32,9 +42,77 @@ public void serialize(Struct value, JsonGenerator gen, SerializerProvider serial
3242
return mapper;
3343
}
3444

45+
protected final boolean versioned;
46+
47+
@Override
48+
public Optional<String> getDescription() {
49+
return Optional.empty();
50+
}
51+
52+
@Override
53+
public Optional<SchemaDescription> getSchema(String topic, Serde.Target type) {
54+
return Optional.empty();
55+
}
56+
57+
@Override
58+
public boolean canDeserialize(String topic, Serde.Target type) {
59+
return true;
60+
}
61+
62+
@Override
63+
public boolean canSerialize(String topic, Serde.Target type) {
64+
return false;
65+
}
66+
67+
@Override
68+
public Serde.Serializer serializer(String topic, Serde.Target type) {
69+
throw new UnsupportedOperationException();
70+
}
71+
3572
@SneakyThrows
3673
protected static String toJson(Struct s) {
3774
return JSON_MAPPER.writeValueAsString(s);
3875
}
3976

77+
@Override
78+
public Deserializer deserializer(String topic, Target target) {
79+
return (recordHeaders, bytes) ->
80+
switch (target) {
81+
case KEY -> deserializeKey(bytes);
82+
case VALUE -> deserializeValue(bytes);
83+
};
84+
}
85+
86+
protected DeserializeResult deserializeKey(byte[] bytes) {
87+
Struct keyStruct = getKeySchema().read(ByteBuffer.wrap(bytes));
88+
return new DeserializeResult(toJson(keyStruct), DeserializeResult.Type.JSON, Map.of());
89+
}
90+
91+
protected DeserializeResult deserializeValue(byte[] bytes) {
92+
ByteBuffer wrap = ByteBuffer.wrap(bytes);
93+
Optional<Schema> valueSchema;
94+
if (versioned) {
95+
short version = wrap.getShort();
96+
valueSchema = getValueSchema(version);
97+
} else {
98+
valueSchema = getValueSchema();
99+
}
100+
if (valueSchema.isPresent()) {
101+
Struct valueStruct = valueSchema.get().read(wrap);
102+
return new DeserializeResult(toJson(valueStruct), DeserializeResult.Type.JSON, Map.of());
103+
} else {
104+
throw new IllegalStateException("Value schema was not present");
105+
}
106+
}
107+
108+
protected abstract Schema getKeySchema();
109+
110+
protected Optional<Schema> getValueSchema() {
111+
return Optional.empty();
112+
}
113+
114+
protected Optional<Schema> getValueSchema(short version) {
115+
return Optional.empty();
116+
}
117+
40118
}
Lines changed: 17 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
package io.kafbat.ui.serdes.builtin.mm2;
22

3-
import io.kafbat.ui.serde.api.DeserializeResult;
4-
import io.kafbat.ui.serde.api.SchemaDescription;
53
import io.kafbat.ui.serdes.BuiltInSerde;
6-
import java.nio.ByteBuffer;
7-
import java.util.Map;
84
import java.util.Optional;
5+
import java.util.OptionalInt;
96
import java.util.regex.Pattern;
107
import lombok.extern.slf4j.Slf4j;
118
import org.apache.kafka.common.protocol.types.Field;
129
import org.apache.kafka.common.protocol.types.Schema;
13-
import org.apache.kafka.common.protocol.types.Struct;
1410
import org.apache.kafka.common.protocol.types.Type;
1511

1612
@Slf4j
@@ -22,57 +18,31 @@ public class OffsetSyncSerde extends MirrorMakerSerde implements BuiltInSerde {
2218
private static final Schema KEY_SCHEMA;
2319

2420
static {
25-
VALUE_SCHEMA = new Schema(new Field("upstreamOffset", Type.INT64), new Field("offset", Type.INT64));
26-
KEY_SCHEMA = new Schema(new Field("topic", Type.STRING), new Field("partition", Type.INT32));
21+
VALUE_SCHEMA = new Schema(
22+
new Field("upstreamOffset", Type.INT64),
23+
new Field("offset", Type.INT64)
24+
);
25+
KEY_SCHEMA = new Schema(
26+
new Field("topic", Type.STRING),
27+
new Field("partition", Type.INT32)
28+
);
2729
}
2830

29-
public static String name() {
30-
return "OffsetSync";
31-
}
32-
33-
@Override
34-
public Optional<String> getDescription() {
35-
return Optional.empty();
36-
}
37-
38-
@Override
39-
public Optional<SchemaDescription> getSchema(String topic, Target type) {
40-
return Optional.empty();
41-
}
42-
43-
@Override
44-
public boolean canDeserialize(String topic, Target type) {
45-
return true;
31+
public OffsetSyncSerde() {
32+
super(false);
4633
}
4734

48-
@Override
49-
public boolean canSerialize(String topic, Target type) {
50-
return false;
35+
public static String name() {
36+
return "OffsetSync";
5137
}
5238

5339
@Override
54-
public Serializer serializer(String topic, Target type) {
55-
throw new UnsupportedOperationException();
40+
protected Schema getKeySchema() {
41+
return KEY_SCHEMA;
5642
}
5743

5844
@Override
59-
public Deserializer deserializer(String topic, Target target) {
60-
return (recordHeaders, bytes) ->
61-
switch (target) {
62-
case KEY -> deserializeKey(bytes);
63-
case VALUE -> deserializeValue(bytes);
64-
};
65-
}
66-
67-
private static DeserializeResult deserializeKey(byte[] bytes) {
68-
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
69-
70-
return new DeserializeResult(toJson(keyStruct), DeserializeResult.Type.JSON, Map.of());
71-
}
72-
73-
private static DeserializeResult deserializeValue(byte[] bytes) {
74-
Struct valueStruct = VALUE_SCHEMA.read(ByteBuffer.wrap(bytes));
75-
76-
return new DeserializeResult(toJson(valueStruct), DeserializeResult.Type.JSON, Map.of());
45+
protected Optional<Schema> getValueSchema() {
46+
return Optional.of(VALUE_SCHEMA);
7747
}
7848
}

0 commit comments

Comments
 (0)