Skip to content

Commit 844e34e

Browse files
committed
Impl HeartbeatSerde
1 parent f4c6692 commit 844e34e

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed

api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.kafbat.ui.serdes.builtin.UInt32Serde;
2121
import io.kafbat.ui.serdes.builtin.UInt64Serde;
2222
import io.kafbat.ui.serdes.builtin.UuidBinarySerde;
23+
import io.kafbat.ui.serdes.builtin.mm2.HeartbeatSerde;
2324
import io.kafbat.ui.serdes.builtin.sr.SchemaRegistrySerde;
2425
import java.util.LinkedHashMap;
2526
import java.util.Map;
@@ -51,6 +52,9 @@ public SerdesInitializer() {
5152
.put(HexSerde.name(), HexSerde.class)
5253
.put(UuidBinarySerde.name(), UuidBinarySerde.class)
5354
.put(ProtobufRawSerde.name(), ProtobufRawSerde.class)
55+
56+
// mm2 serdes
57+
.put(HeartbeatSerde.name(), HeartbeatSerde.class)
5458
.build(),
5559
new CustomSerdeLoader()
5660
);
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package io.kafbat.ui.serdes.builtin.mm2;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import io.kafbat.ui.serde.api.DeserializeResult;
6+
import io.kafbat.ui.serde.api.SchemaDescription;
7+
import io.kafbat.ui.serdes.BuiltInSerde;
8+
import java.nio.ByteBuffer;
9+
import java.util.Map;
10+
import java.util.Optional;
11+
import lombok.extern.slf4j.Slf4j;
12+
import org.apache.kafka.common.protocol.types.Field;
13+
import org.apache.kafka.common.protocol.types.Schema;
14+
import org.apache.kafka.common.protocol.types.Struct;
15+
import org.apache.kafka.common.protocol.types.Type;
16+
17+
@Slf4j
18+
public class HeartbeatSerde implements BuiltInSerde {
19+
20+
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
21+
22+
public static final String SOURCE_CLUSTER_ALIAS_KEY = "sourceClusterAlias";
23+
public static final String TARGET_CLUSTER_ALIAS_KEY = "targetClusterAlias";
24+
public static final String TIMESTAMP_KEY = "timestamp";
25+
public static final String VERSION_KEY = "version";
26+
public static final short VERSION = 0;
27+
28+
public static final Schema VALUE_SCHEMA_V0 = new Schema(
29+
new Field(TIMESTAMP_KEY, Type.INT64));
30+
31+
public static final Schema KEY_SCHEMA = new Schema(
32+
new Field(SOURCE_CLUSTER_ALIAS_KEY, Type.STRING),
33+
new Field(TARGET_CLUSTER_ALIAS_KEY, Type.STRING));
34+
35+
public static final Schema HEADER_SCHEMA = new Schema(
36+
new Field(VERSION_KEY, Type.INT16));
37+
38+
public static String name() {
39+
return "Heartbeat";
40+
}
41+
42+
@Override
43+
public Optional<String> getDescription() {
44+
return Optional.empty();
45+
}
46+
47+
@Override
48+
public Optional<SchemaDescription> getSchema(String topic, Target type) {
49+
return Optional.empty();
50+
}
51+
52+
@Override
53+
public boolean canDeserialize(String topic, Target type) {
54+
return true;
55+
}
56+
57+
@Override
58+
public boolean canSerialize(String topic, Target type) {
59+
return false;
60+
}
61+
62+
@Override
63+
public Serializer serializer(String topic, Target type) {
64+
throw new UnsupportedOperationException();
65+
}
66+
67+
@Override
68+
public Deserializer deserializer(String topic, Target target) {
69+
return (recordHeaders, bytes) -> switch (target) {
70+
71+
case KEY: {
72+
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
73+
String sourceClusterAlias = keyStruct.getString(SOURCE_CLUSTER_ALIAS_KEY);
74+
String targetClusterAlias = keyStruct.getString(TARGET_CLUSTER_ALIAS_KEY);
75+
76+
var map = Map.of(
77+
"sourceClusterAlias", sourceClusterAlias,
78+
"targetClusterAlias", targetClusterAlias
79+
);
80+
81+
try {
82+
var result = OBJECT_MAPPER.writeValueAsString(map);
83+
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
84+
} catch (JsonProcessingException e) {
85+
log.error("Error serializing record", e);
86+
throw new RuntimeException(e);
87+
}
88+
}
89+
90+
case VALUE: {
91+
ByteBuffer value = ByteBuffer.wrap(bytes);
92+
Struct headerStruct = HEADER_SCHEMA.read(value);
93+
short version = headerStruct.getShort(VERSION_KEY);
94+
Struct valueStruct = valueSchema(version).read(value);
95+
long timestamp = valueStruct.getLong(TIMESTAMP_KEY);
96+
yield new DeserializeResult(String.valueOf(timestamp), DeserializeResult.Type.STRING, Map.of());
97+
}
98+
99+
};
100+
}
101+
102+
private static Schema valueSchema(short version) {
103+
assert version == 0;
104+
return VALUE_SCHEMA_V0;
105+
}
106+
}

0 commit comments

Comments
 (0)