Skip to content

Commit 496aa43

Browse files
committed
Impl OffsetSyncSerde
1 parent 844e34e commit 496aa43

File tree

2 files changed

+106
-0
lines changed

2 files changed

+106
-0
lines changed

api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.kafbat.ui.serdes.builtin.UInt64Serde;
2222
import io.kafbat.ui.serdes.builtin.UuidBinarySerde;
2323
import io.kafbat.ui.serdes.builtin.mm2.HeartbeatSerde;
24+
import io.kafbat.ui.serdes.builtin.mm2.OffsetSyncSerde;
2425
import io.kafbat.ui.serdes.builtin.sr.SchemaRegistrySerde;
2526
import java.util.LinkedHashMap;
2627
import java.util.Map;
@@ -55,6 +56,7 @@ public SerdesInitializer() {
5556

5657
// mm2 serdes
5758
.put(HeartbeatSerde.name(), HeartbeatSerde.class)
59+
.put(OffsetSyncSerde.name(), OffsetSyncSerde.class)
5860
.build(),
5961
new CustomSerdeLoader()
6062
);
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package io.kafbat.ui.serdes.builtin.mm2;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import io.kafbat.ui.serde.api.DeserializeResult;
6+
import io.kafbat.ui.serde.api.SchemaDescription;
7+
import io.kafbat.ui.serdes.BuiltInSerde;
8+
import java.nio.ByteBuffer;
9+
import java.util.Map;
10+
import java.util.Optional;
11+
import lombok.extern.slf4j.Slf4j;
12+
import org.apache.kafka.common.protocol.types.Field;
13+
import org.apache.kafka.common.protocol.types.Schema;
14+
import org.apache.kafka.common.protocol.types.Struct;
15+
import org.apache.kafka.common.protocol.types.Type;
16+
17+
@Slf4j
18+
public class OffsetSyncSerde implements BuiltInSerde {
19+
20+
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
21+
22+
public static final String TOPIC_KEY = "topic";
23+
public static final String PARTITION_KEY = "partition";
24+
public static final String UPSTREAM_OFFSET_KEY = "upstreamOffset";
25+
public static final String DOWNSTREAM_OFFSET_KEY = "offset";
26+
public static final Schema VALUE_SCHEMA;
27+
public static final Schema KEY_SCHEMA;
28+
29+
static {
30+
VALUE_SCHEMA = new Schema(new Field("upstreamOffset", Type.INT64), new Field("offset", Type.INT64));
31+
KEY_SCHEMA = new Schema(new Field("topic", Type.STRING), new Field("partition", Type.INT32));
32+
}
33+
34+
public static String name() {
35+
return "OffsetSync";
36+
}
37+
38+
@Override
39+
public Optional<String> getDescription() {
40+
return Optional.empty();
41+
}
42+
43+
@Override
44+
public Optional<SchemaDescription> getSchema(String topic, Target type) {
45+
return Optional.empty();
46+
}
47+
48+
@Override
49+
public boolean canDeserialize(String topic, Target type) {
50+
return true;
51+
}
52+
53+
@Override
54+
public boolean canSerialize(String topic, Target type) {
55+
return false;
56+
}
57+
58+
@Override
59+
public Serializer serializer(String topic, Target type) {
60+
throw new UnsupportedOperationException();
61+
}
62+
63+
@Override
64+
public Deserializer deserializer(String topic, Target target) {
65+
return (recordHeaders, bytes) -> switch (target) {
66+
67+
case KEY: {
68+
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
69+
String t = keyStruct.getString(TOPIC_KEY);
70+
int partition = keyStruct.getInt(PARTITION_KEY);
71+
72+
var map = Map.of(
73+
TOPIC_KEY, t,
74+
PARTITION_KEY, partition
75+
);
76+
77+
try {
78+
var result = OBJECT_MAPPER.writeValueAsString(map);
79+
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
80+
} catch (JsonProcessingException e) {
81+
log.error("Error serializing record", e);
82+
throw new RuntimeException(e);
83+
}
84+
}
85+
86+
case VALUE: {
87+
Struct valueStruct = VALUE_SCHEMA.read(ByteBuffer.wrap(bytes));
88+
var map = Map.of(
89+
UPSTREAM_OFFSET_KEY, valueStruct.getLong(UPSTREAM_OFFSET_KEY),
90+
DOWNSTREAM_OFFSET_KEY, valueStruct.getLong(DOWNSTREAM_OFFSET_KEY)
91+
);
92+
93+
try {
94+
var result = OBJECT_MAPPER.writeValueAsString(map);
95+
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
96+
} catch (JsonProcessingException e) {
97+
log.error("Error serializing record", e);
98+
throw new RuntimeException(e);
99+
}
100+
}
101+
102+
};
103+
}
104+
}

0 commit comments

Comments
 (0)