Skip to content

Commit 9c987ef

Browse files
committed
Simplify serialization
1 parent 1ea2aa6 commit 9c987ef

File tree

4 files changed

+51
-96
lines changed

4 files changed

+51
-96
lines changed

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/CheckpointSerde.java

Lines changed: 3 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package io.kafbat.ui.serdes.builtin.mm2;
22

3-
import com.fasterxml.jackson.core.JsonProcessingException;
4-
import com.fasterxml.jackson.databind.ObjectMapper;
53
import io.kafbat.ui.serde.api.DeserializeResult;
64
import io.kafbat.ui.serde.api.SchemaDescription;
75
import io.kafbat.ui.serdes.BuiltInSerde;
@@ -16,12 +14,10 @@
1614
import org.apache.kafka.common.protocol.types.Type;
1715

1816
@Slf4j
19-
public class CheckpointSerde implements BuiltInSerde {
17+
public class CheckpointSerde extends MirrorMakerSerde implements BuiltInSerde {
2018

2119
public static final Pattern TOPIC_NAME_PATTERN = Pattern.compile(".*\\.checkpoints\\.internal");
2220

23-
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
24-
2521
private static final String TOPIC_KEY = "topic";
2622
private static final String PARTITION_KEY = "partition";
2723
private static final String CONSUMER_GROUP_ID_KEY = "group";
@@ -83,24 +79,7 @@ public Deserializer deserializer(String topic, Target target) {
8379

8480
private static DeserializeResult deserializeKey(byte[] bytes) {
8581
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
86-
87-
String group = keyStruct.getString(CONSUMER_GROUP_ID_KEY);
88-
String t = keyStruct.getString(TOPIC_KEY);
89-
int partition = keyStruct.getInt(PARTITION_KEY);
90-
91-
var map = Map.of(
92-
CONSUMER_GROUP_ID_KEY, group,
93-
TOPIC_KEY, t,
94-
PARTITION_KEY, partition
95-
);
96-
97-
try {
98-
var result = OBJECT_MAPPER.writeValueAsString(map);
99-
return new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
100-
} catch (JsonProcessingException e) {
101-
log.error("Error deserializing record", e);
102-
throw new RuntimeException("Error deserializing record", e);
103-
}
82+
return new DeserializeResult(toJson(keyStruct), DeserializeResult.Type.JSON, Map.of());
10483
}
10584

10685
private static DeserializeResult deserializeValue(byte[] bytes) {
@@ -110,23 +89,7 @@ private static DeserializeResult deserializeValue(byte[] bytes) {
11089
Schema valueSchema = valueSchema(version);
11190
Struct valueStruct = valueSchema.read(value);
11291

113-
long upstreamOffset = valueStruct.getLong(UPSTREAM_OFFSET_KEY);
114-
long downstreamOffset = valueStruct.getLong(DOWNSTREAM_OFFSET_KEY);
115-
String metadata = valueStruct.getString(METADATA_KEY);
116-
117-
var map = Map.of(
118-
UPSTREAM_OFFSET_KEY, upstreamOffset,
119-
DOWNSTREAM_OFFSET_KEY, downstreamOffset,
120-
METADATA_KEY, metadata
121-
);
122-
123-
try {
124-
var result = OBJECT_MAPPER.writeValueAsString(map);
125-
return new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
126-
} catch (JsonProcessingException e) {
127-
log.error("Error deserializing record", e);
128-
throw new RuntimeException("Error deserializing record", e);
129-
}
92+
return new DeserializeResult(toJson(valueStruct), DeserializeResult.Type.JSON, Map.of());
13093
}
13194

13295
private static Schema valueSchema(short version) {

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/HeartbeatSerde.java

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package io.kafbat.ui.serdes.builtin.mm2;
22

3-
import com.fasterxml.jackson.core.JsonProcessingException;
4-
import com.fasterxml.jackson.databind.ObjectMapper;
53
import io.kafbat.ui.serde.api.DeserializeResult;
64
import io.kafbat.ui.serde.api.SchemaDescription;
75
import io.kafbat.ui.serdes.BuiltInSerde;
@@ -16,12 +14,10 @@
1614
import org.apache.kafka.common.protocol.types.Type;
1715

1816
@Slf4j
19-
public class HeartbeatSerde implements BuiltInSerde {
17+
public class HeartbeatSerde extends MirrorMakerSerde implements BuiltInSerde {
2018

2119
public static final Pattern TOPIC_NAME_PATTERN = Pattern.compile("heartbeats");
2220

23-
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
24-
2521
private static final String SOURCE_CLUSTER_ALIAS_KEY = "sourceClusterAlias";
2622
private static final String TARGET_CLUSTER_ALIAS_KEY = "targetClusterAlias";
2723
private static final String TIMESTAMP_KEY = "timestamp";
@@ -77,21 +73,8 @@ public Deserializer deserializer(String topic, Target target) {
7773

7874
private static DeserializeResult deserializeKey(byte[] bytes) {
7975
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
80-
String sourceClusterAlias = keyStruct.getString(SOURCE_CLUSTER_ALIAS_KEY);
81-
String targetClusterAlias = keyStruct.getString(TARGET_CLUSTER_ALIAS_KEY);
82-
83-
var map = Map.of(
84-
"sourceClusterAlias", sourceClusterAlias,
85-
"targetClusterAlias", targetClusterAlias
86-
);
87-
88-
try {
89-
var result = OBJECT_MAPPER.writeValueAsString(map);
90-
return new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
91-
} catch (JsonProcessingException e) {
92-
log.error("Error deserializing record", e);
93-
throw new RuntimeException("Error deserializing record", e);
94-
}
76+
77+
return new DeserializeResult(toJson(keyStruct), DeserializeResult.Type.JSON, Map.of());
9578
}
9679

9780
private static DeserializeResult deserializeValue(byte[] bytes) {
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.kafbat.ui.serdes.builtin.mm2;
2+
3+
import com.fasterxml.jackson.core.JsonGenerator;
4+
import com.fasterxml.jackson.databind.JsonSerializer;
5+
import com.fasterxml.jackson.databind.SerializerProvider;
6+
import com.fasterxml.jackson.databind.json.JsonMapper;
7+
import com.fasterxml.jackson.databind.module.SimpleModule;
8+
import java.io.IOException;
9+
import lombok.SneakyThrows;
10+
import org.apache.kafka.common.protocol.types.BoundField;
11+
import org.apache.kafka.common.protocol.types.Struct;
12+
13+
abstract class MirrorMakerSerde {
14+
15+
protected static final JsonMapper JSON_MAPPER = createMapper();
16+
17+
protected static JsonMapper createMapper() {
18+
var module = new SimpleModule();
19+
module.addSerializer(Struct.class, new JsonSerializer<>() {
20+
@Override
21+
public void serialize(Struct value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
22+
gen.writeStartObject();
23+
for (BoundField field : value.schema().fields()) {
24+
var fieldVal = value.get(field);
25+
gen.writeObjectField(field.def.name, fieldVal);
26+
}
27+
gen.writeEndObject();
28+
}
29+
});
30+
var mapper = new JsonMapper();
31+
mapper.registerModule(module);
32+
return mapper;
33+
}
34+
35+
@SneakyThrows
36+
protected static String toJson(Struct s) {
37+
return JSON_MAPPER.writeValueAsString(s);
38+
}
39+
40+
}

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/OffsetSyncSerde.java

Lines changed: 5 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package io.kafbat.ui.serdes.builtin.mm2;
22

3-
import com.fasterxml.jackson.core.JsonProcessingException;
4-
import com.fasterxml.jackson.databind.ObjectMapper;
53
import io.kafbat.ui.serde.api.DeserializeResult;
64
import io.kafbat.ui.serde.api.SchemaDescription;
75
import io.kafbat.ui.serdes.BuiltInSerde;
@@ -16,16 +14,10 @@
1614
import org.apache.kafka.common.protocol.types.Type;
1715

1816
@Slf4j
19-
public class OffsetSyncSerde implements BuiltInSerde {
17+
public class OffsetSyncSerde extends MirrorMakerSerde implements BuiltInSerde {
2018

2119
public static final Pattern TOPIC_NAME_PATTERN = Pattern.compile("mm2-offset-syncs\\..*\\.internal");
2220

23-
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
24-
25-
private static final String TOPIC_KEY = "topic";
26-
private static final String PARTITION_KEY = "partition";
27-
private static final String UPSTREAM_OFFSET_KEY = "upstreamOffset";
28-
private static final String DOWNSTREAM_OFFSET_KEY = "offset";
2921
private static final Schema VALUE_SCHEMA;
3022
private static final Schema KEY_SCHEMA;
3123

@@ -74,36 +66,13 @@ public Deserializer deserializer(String topic, Target target) {
7466

7567
private static DeserializeResult deserializeKey(byte[] bytes) {
7668
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
77-
String t = keyStruct.getString(TOPIC_KEY);
78-
int partition = keyStruct.getInt(PARTITION_KEY);
79-
80-
var map = Map.of(
81-
TOPIC_KEY, t,
82-
PARTITION_KEY, partition
83-
);
84-
85-
try {
86-
var result = OBJECT_MAPPER.writeValueAsString(map);
87-
return new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
88-
} catch (JsonProcessingException e) {
89-
log.error("Error deserializing record", e);
90-
throw new RuntimeException("Error deserializing record", e);
91-
}
69+
70+
return new DeserializeResult(toJson(keyStruct), DeserializeResult.Type.JSON, Map.of());
9271
}
9372

9473
private static DeserializeResult deserializeValue(byte[] bytes) {
9574
Struct valueStruct = VALUE_SCHEMA.read(ByteBuffer.wrap(bytes));
96-
var map = Map.of(
97-
UPSTREAM_OFFSET_KEY, valueStruct.getLong(UPSTREAM_OFFSET_KEY),
98-
DOWNSTREAM_OFFSET_KEY, valueStruct.getLong(DOWNSTREAM_OFFSET_KEY)
99-
);
100-
101-
try {
102-
var result = OBJECT_MAPPER.writeValueAsString(map);
103-
return new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
104-
} catch (JsonProcessingException e) {
105-
log.error("Error deserializing record", e);
106-
throw new RuntimeException("Error deserializing record", e);
107-
}
75+
76+
return new DeserializeResult(toJson(valueStruct), DeserializeResult.Type.JSON, Map.of());
10877
}
10978
}

0 commit comments

Comments
 (0)