Skip to content

Commit c5cb67d

Browse files
committed
Reduced duplicate code
1 parent 7e25c7c commit c5cb67d

File tree

7 files changed

+61
-105
lines changed

7 files changed

+61
-105
lines changed

api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java

Lines changed: 13 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -169,48 +169,21 @@ private void registerConsumerOffsetsSerde(Map<String, SerdeInstance> serdes) {
169169
}
170170

171171
private void registerMirrorMakerSerdes(Map<String, SerdeInstance> serdes) {
172-
registerHeartbeatSerde(serdes);
173-
registerOffsetSyncSerde(serdes);
174-
registerCheckpointSerde(serdes);
175-
}
176-
177-
private void registerHeartbeatSerde(Map<String, SerdeInstance> serdes) {
178-
serdes.put(
179-
HeartbeatSerde.name(),
180-
new SerdeInstance(
181-
HeartbeatSerde.name(),
182-
new HeartbeatSerde(),
183-
HeartbeatSerde.TOPIC_NAME_PATTERN,
184-
HeartbeatSerde.TOPIC_NAME_PATTERN,
185-
null
186-
)
187-
);
188-
}
189-
190-
private void registerOffsetSyncSerde(Map<String, SerdeInstance> serdes) {
191-
serdes.put(
192-
OffsetSyncSerde.name(),
193-
new SerdeInstance(
194-
OffsetSyncSerde.name(),
195-
new OffsetSyncSerde(),
196-
OffsetSyncSerde.TOPIC_NAME_PATTERN,
197-
OffsetSyncSerde.TOPIC_NAME_PATTERN,
198-
null
199-
)
172+
Map<String,Map.Entry<Pattern, BuiltInSerde>> mmSerdes = Map.of(
173+
HeartbeatSerde.name(), Map.entry(HeartbeatSerde.TOPIC_NAME_PATTERN, new HeartbeatSerde()),
174+
OffsetSyncSerde.name(), Map.entry(OffsetSyncSerde.TOPIC_NAME_PATTERN, new OffsetSyncSerde()),
175+
CheckpointSerde.name(), Map.entry(CheckpointSerde.TOPIC_NAME_PATTERN, new CheckpointSerde())
200176
);
201-
}
177+
for (Map.Entry<String, Map.Entry<Pattern, BuiltInSerde>> serde : mmSerdes.entrySet()) {
178+
String name = serde.getKey();
179+
Pattern pattern = serde.getValue().getKey();
180+
BuiltInSerde serdeInstance = serde.getValue().getValue();
202181

203-
private void registerCheckpointSerde(Map<String, SerdeInstance> serdes) {
204-
serdes.put(
205-
CheckpointSerde.name(),
206-
new SerdeInstance(
207-
CheckpointSerde.name(),
208-
new CheckpointSerde(),
209-
CheckpointSerde.TOPIC_NAME_PATTERN,
210-
CheckpointSerde.TOPIC_NAME_PATTERN,
211-
null
212-
)
213-
);
182+
serdes.put(
183+
name,
184+
new SerdeInstance(name, serdeInstance, pattern, pattern, null)
185+
);
186+
}
214187
}
215188

216189
private SerdeInstance createFallbackSerde() {

api/src/main/java/io/kafbat/ui/serdes/builtin/ConsumerOffsetsSerde.java

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323

2424
// Deserialization logic and message's schemas can be found in
2525
// kafka.coordinator.group.GroupMetadataManager (readMessageKey, readOffsetMessageValue, readGroupMessageValue)
26-
public class ConsumerOffsetsSerde implements BuiltInSerde {
27-
28-
private static final JsonMapper JSON_MAPPER = createMapper();
26+
public class ConsumerOffsetsSerde extends StructSerde implements BuiltInSerde {
2927

3028
private static final String ASSIGNMENT = "assignment";
3129
private static final String CLIENT_HOST = "client_host";
@@ -50,24 +48,6 @@ public static String name() {
5048
return "__consumer_offsets";
5149
}
5250

53-
private static JsonMapper createMapper() {
54-
var module = new SimpleModule();
55-
module.addSerializer(Struct.class, new JsonSerializer<>() {
56-
@Override
57-
public void serialize(Struct value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
58-
gen.writeStartObject();
59-
for (BoundField field : value.schema().fields()) {
60-
var fieldVal = value.get(field);
61-
gen.writeObjectField(field.def.name, fieldVal);
62-
}
63-
gen.writeEndObject();
64-
}
65-
});
66-
var mapper = new JsonMapper();
67-
mapper.registerModule(module);
68-
return mapper;
69-
}
70-
7151
@Override
7252
public Optional<String> getDescription() {
7353
return Optional.empty();
@@ -304,8 +284,5 @@ private Deserializer valueDeserializer() {
304284
};
305285
}
306286

307-
@SneakyThrows
308-
private String toJson(Struct s) {
309-
return JSON_MAPPER.writeValueAsString(s);
310-
}
287+
311288
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.kafbat.ui.serdes.builtin;
2+
3+
import com.fasterxml.jackson.core.JsonGenerator;
4+
import com.fasterxml.jackson.databind.JsonSerializer;
5+
import com.fasterxml.jackson.databind.SerializerProvider;
6+
import com.fasterxml.jackson.databind.json.JsonMapper;
7+
import com.fasterxml.jackson.databind.module.SimpleModule;
8+
import java.io.IOException;
9+
import lombok.SneakyThrows;
10+
import org.apache.kafka.common.protocol.types.BoundField;
11+
import org.apache.kafka.common.protocol.types.Struct;
12+
13+
public abstract class StructSerde {
14+
15+
private static final JsonMapper JSON_MAPPER = createMapper();
16+
17+
private static JsonMapper createMapper() {
18+
var module = new SimpleModule();
19+
module.addSerializer(Struct.class, new JsonSerializer<>() {
20+
@Override
21+
public void serialize(Struct value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
22+
gen.writeStartObject();
23+
for (BoundField field : value.schema().fields()) {
24+
var fieldVal = value.get(field);
25+
gen.writeObjectField(field.def.name, fieldVal);
26+
}
27+
gen.writeEndObject();
28+
}
29+
});
30+
var mapper = new JsonMapper();
31+
mapper.registerModule(module);
32+
return mapper;
33+
}
34+
35+
@SneakyThrows
36+
protected String toJson(Struct s) {
37+
return JSON_MAPPER.writeValueAsString(s);
38+
}
39+
}

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/CheckpointSerde.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import io.kafbat.ui.serdes.BuiltInSerde;
44
import java.util.Optional;
5-
import java.util.OptionalInt;
65
import java.util.regex.Pattern;
76
import lombok.extern.slf4j.Slf4j;
87
import org.apache.kafka.common.protocol.types.Field;

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/MirrorMakerSerde.java

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,19 @@
11
package io.kafbat.ui.serdes.builtin.mm2;
22

3-
import com.fasterxml.jackson.core.JsonGenerator;
4-
import com.fasterxml.jackson.databind.JsonSerializer;
5-
import com.fasterxml.jackson.databind.SerializerProvider;
6-
import com.fasterxml.jackson.databind.json.JsonMapper;
7-
import com.fasterxml.jackson.databind.module.SimpleModule;
83
import io.kafbat.ui.serde.api.DeserializeResult;
94
import io.kafbat.ui.serde.api.SchemaDescription;
105
import io.kafbat.ui.serde.api.Serde;
116
import io.kafbat.ui.serdes.BuiltInSerde;
12-
import java.io.IOException;
7+
import io.kafbat.ui.serdes.builtin.StructSerde;
138
import java.nio.ByteBuffer;
149
import java.util.Map;
1510
import java.util.Optional;
1611
import lombok.RequiredArgsConstructor;
17-
import lombok.SneakyThrows;
18-
import org.apache.kafka.common.protocol.types.BoundField;
1912
import org.apache.kafka.common.protocol.types.Schema;
2013
import org.apache.kafka.common.protocol.types.Struct;
2114

2215
@RequiredArgsConstructor
23-
abstract class MirrorMakerSerde implements BuiltInSerde {
24-
25-
protected static final JsonMapper JSON_MAPPER = createMapper();
26-
27-
protected static JsonMapper createMapper() {
28-
var module = new SimpleModule();
29-
module.addSerializer(Struct.class, new JsonSerializer<>() {
30-
@Override
31-
public void serialize(Struct value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
32-
gen.writeStartObject();
33-
for (BoundField field : value.schema().fields()) {
34-
var fieldVal = value.get(field);
35-
gen.writeObjectField(field.def.name, fieldVal);
36-
}
37-
gen.writeEndObject();
38-
}
39-
});
40-
var mapper = new JsonMapper();
41-
mapper.registerModule(module);
42-
return mapper;
43-
}
16+
abstract class MirrorMakerSerde extends StructSerde implements BuiltInSerde {
4417

4518
protected final boolean versioned;
4619

@@ -69,11 +42,6 @@ public Serde.Serializer serializer(String topic, Serde.Target type) {
6942
throw new UnsupportedOperationException();
7043
}
7144

72-
@SneakyThrows
73-
protected static String toJson(Struct s) {
74-
return JSON_MAPPER.writeValueAsString(s);
75-
}
76-
7745
@Override
7846
public Deserializer deserializer(String topic, Target target) {
7947
return (recordHeaders, bytes) ->

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/OffsetSyncSerde.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import io.kafbat.ui.serdes.BuiltInSerde;
44
import java.util.Optional;
5-
import java.util.OptionalInt;
65
import java.util.regex.Pattern;
76
import lombok.extern.slf4j.Slf4j;
87
import org.apache.kafka.common.protocol.types.Field;

api/src/test/java/io/kafbat/ui/serdes/builtin/mm2/HeartbeatSerdeTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,15 @@ void testDeserializeKey() throws JsonProcessingException {
4646
}
4747

4848
@Test
49-
void testDeserializeValue() {
49+
void testDeserializeValue() throws JsonProcessingException {
5050
var value = decodeBase64("AAAAAAGZgCEMZA==");
51-
var expected = "1758791273572";
51+
var expected = Map.of("timestamp", 1758791273572L);
5252

5353
var result = SERDE.deserializer(TOPIC, Serde.Target.VALUE).deserialize(HEADERS, value);
54+
var resultMap = jsonToMap(result.getResult());
5455

55-
assertEquals(DeserializeResult.Type.STRING, result.getType());
56-
assertEquals(expected, result.getResult());
56+
assertEquals(DeserializeResult.Type.JSON, result.getType());
57+
assertEquals(expected, resultMap);
5758
}
5859

5960
}

0 commit comments

Comments
 (0)