Skip to content

Commit 13c951f

Browse files
committed
Impl CheckpointSerde
1 parent 496aa43 commit 13c951f

File tree

2 files changed

+135
-0
lines changed

2 files changed

+135
-0
lines changed

api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.kafbat.ui.serdes.builtin.UInt32Serde;
2121
import io.kafbat.ui.serdes.builtin.UInt64Serde;
2222
import io.kafbat.ui.serdes.builtin.UuidBinarySerde;
23+
import io.kafbat.ui.serdes.builtin.mm2.CheckpointSerde;
2324
import io.kafbat.ui.serdes.builtin.mm2.HeartbeatSerde;
2425
import io.kafbat.ui.serdes.builtin.mm2.OffsetSyncSerde;
2526
import io.kafbat.ui.serdes.builtin.sr.SchemaRegistrySerde;
@@ -57,6 +58,7 @@ public SerdesInitializer() {
5758
// mm2 serdes
5859
.put(HeartbeatSerde.name(), HeartbeatSerde.class)
5960
.put(OffsetSyncSerde.name(), OffsetSyncSerde.class)
61+
.put(CheckpointSerde.name(), CheckpointSerde.class)
6062
.build(),
6163
new CustomSerdeLoader()
6264
);
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package io.kafbat.ui.serdes.builtin.mm2;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import io.kafbat.ui.serde.api.DeserializeResult;
6+
import io.kafbat.ui.serde.api.SchemaDescription;
7+
import io.kafbat.ui.serdes.BuiltInSerde;
8+
import java.nio.ByteBuffer;
9+
import java.util.Map;
10+
import java.util.Optional;
11+
import lombok.extern.slf4j.Slf4j;
12+
import org.apache.kafka.common.protocol.types.Field;
13+
import org.apache.kafka.common.protocol.types.Schema;
14+
import org.apache.kafka.common.protocol.types.Struct;
15+
import org.apache.kafka.common.protocol.types.Type;
16+
17+
@Slf4j
18+
public class CheckpointSerde implements BuiltInSerde {
19+
20+
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
21+
22+
public static final String TOPIC_KEY = "topic";
23+
public static final String PARTITION_KEY = "partition";
24+
public static final String CONSUMER_GROUP_ID_KEY = "group";
25+
public static final String UPSTREAM_OFFSET_KEY = "upstreamOffset";
26+
public static final String DOWNSTREAM_OFFSET_KEY = "offset";
27+
public static final String METADATA_KEY = "metadata";
28+
public static final String VERSION_KEY = "version";
29+
public static final short VERSION = 0;
30+
31+
public static final Schema VALUE_SCHEMA_V0 = new Schema(
32+
new Field(UPSTREAM_OFFSET_KEY, Type.INT64),
33+
new Field(DOWNSTREAM_OFFSET_KEY, Type.INT64),
34+
new Field(METADATA_KEY, Type.STRING));
35+
36+
public static final Schema KEY_SCHEMA = new Schema(
37+
new Field(CONSUMER_GROUP_ID_KEY, Type.STRING),
38+
new Field(TOPIC_KEY, Type.STRING),
39+
new Field(PARTITION_KEY, Type.INT32));
40+
41+
public static final Schema HEADER_SCHEMA = new Schema(
42+
new Field(VERSION_KEY, Type.INT16));
43+
44+
public static String name() {
45+
return "Checkpoint";
46+
}
47+
48+
@Override
49+
public Optional<String> getDescription() {
50+
return Optional.empty();
51+
}
52+
53+
@Override
54+
public Optional<SchemaDescription> getSchema(String topic, Target type) {
55+
return Optional.empty();
56+
}
57+
58+
@Override
59+
public boolean canDeserialize(String topic, Target type) {
60+
return true;
61+
}
62+
63+
@Override
64+
public boolean canSerialize(String topic, Target type) {
65+
return false;
66+
}
67+
68+
@Override
69+
public Serializer serializer(String topic, Target type) {
70+
throw new UnsupportedOperationException();
71+
}
72+
73+
@Override
74+
public Deserializer deserializer(String topic, Target target) {
75+
return (recordHeaders, bytes) -> switch (target) {
76+
77+
case KEY: {
78+
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
79+
80+
String group = keyStruct.getString(CONSUMER_GROUP_ID_KEY);
81+
String t = keyStruct.getString(TOPIC_KEY);
82+
int partition = keyStruct.getInt(PARTITION_KEY);
83+
84+
var map = Map.of(
85+
CONSUMER_GROUP_ID_KEY, group,
86+
TOPIC_KEY, t,
87+
PARTITION_KEY, partition
88+
);
89+
90+
try {
91+
var result = OBJECT_MAPPER.writeValueAsString(map);
92+
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
93+
} catch (JsonProcessingException e) {
94+
log.error("Error serializing record", e);
95+
throw new RuntimeException(e);
96+
}
97+
}
98+
99+
case VALUE: {
100+
ByteBuffer value = ByteBuffer.wrap(bytes);
101+
Struct header = HEADER_SCHEMA.read(value);
102+
short version = header.getShort(VERSION_KEY);
103+
Schema valueSchema = valueSchema(version);
104+
Struct valueStruct = valueSchema.read(value);
105+
106+
long upstreamOffset = valueStruct.getLong(UPSTREAM_OFFSET_KEY);
107+
long downstreamOffset = valueStruct.getLong(DOWNSTREAM_OFFSET_KEY);
108+
String metadata = valueStruct.getString(METADATA_KEY);
109+
110+
var map = Map.of(
111+
UPSTREAM_OFFSET_KEY, upstreamOffset,
112+
DOWNSTREAM_OFFSET_KEY, downstreamOffset,
113+
METADATA_KEY, metadata
114+
);
115+
116+
try {
117+
var result = OBJECT_MAPPER.writeValueAsString(map);
118+
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
119+
} catch (JsonProcessingException e) {
120+
log.error("Error serializing record", e);
121+
throw new RuntimeException(e);
122+
}
123+
}
124+
125+
};
126+
}
127+
128+
private static Schema valueSchema(short version) {
129+
assert version == 0;
130+
return VALUE_SCHEMA_V0;
131+
}
132+
133+
}

0 commit comments

Comments
 (0)