Skip to content

Commit c25604b

Browse files
committed
lint
1 parent 13c951f commit c25604b

File tree

3 files changed

+126
-123
lines changed

3 files changed

+126
-123
lines changed

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/CheckpointSerde.java

Lines changed: 53 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
@Slf4j
1818
public class CheckpointSerde implements BuiltInSerde {
1919

20-
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
20+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
2121

2222
public static final String TOPIC_KEY = "topic";
2323
public static final String PARTITION_KEY = "partition";
@@ -72,57 +72,58 @@ public Serializer serializer(String topic, Target type) {
7272

7373
@Override
7474
public Deserializer deserializer(String topic, Target target) {
75-
return (recordHeaders, bytes) -> switch (target) {
76-
77-
case KEY: {
78-
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
79-
80-
String group = keyStruct.getString(CONSUMER_GROUP_ID_KEY);
81-
String t = keyStruct.getString(TOPIC_KEY);
82-
int partition = keyStruct.getInt(PARTITION_KEY);
83-
84-
var map = Map.of(
85-
CONSUMER_GROUP_ID_KEY, group,
86-
TOPIC_KEY, t,
87-
PARTITION_KEY, partition
88-
);
89-
90-
try {
91-
var result = OBJECT_MAPPER.writeValueAsString(map);
92-
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
93-
} catch (JsonProcessingException e) {
94-
log.error("Error serializing record", e);
95-
throw new RuntimeException(e);
96-
}
97-
}
98-
99-
case VALUE: {
100-
ByteBuffer value = ByteBuffer.wrap(bytes);
101-
Struct header = HEADER_SCHEMA.read(value);
102-
short version = header.getShort(VERSION_KEY);
103-
Schema valueSchema = valueSchema(version);
104-
Struct valueStruct = valueSchema.read(value);
105-
106-
long upstreamOffset = valueStruct.getLong(UPSTREAM_OFFSET_KEY);
107-
long downstreamOffset = valueStruct.getLong(DOWNSTREAM_OFFSET_KEY);
108-
String metadata = valueStruct.getString(METADATA_KEY);
109-
110-
var map = Map.of(
111-
UPSTREAM_OFFSET_KEY, upstreamOffset,
112-
DOWNSTREAM_OFFSET_KEY, downstreamOffset,
113-
METADATA_KEY, metadata
114-
);
115-
116-
try {
117-
var result = OBJECT_MAPPER.writeValueAsString(map);
118-
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
119-
} catch (JsonProcessingException e) {
120-
log.error("Error serializing record", e);
121-
throw new RuntimeException(e);
122-
}
123-
}
124-
125-
};
75+
return (recordHeaders, bytes) ->
76+
switch (target) {
77+
78+
case KEY: {
79+
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
80+
81+
String group = keyStruct.getString(CONSUMER_GROUP_ID_KEY);
82+
String t = keyStruct.getString(TOPIC_KEY);
83+
int partition = keyStruct.getInt(PARTITION_KEY);
84+
85+
var map = Map.of(
86+
CONSUMER_GROUP_ID_KEY, group,
87+
TOPIC_KEY, t,
88+
PARTITION_KEY, partition
89+
);
90+
91+
try {
92+
var result = OBJECT_MAPPER.writeValueAsString(map);
93+
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
94+
} catch (JsonProcessingException e) {
95+
log.error("Error serializing record", e);
96+
throw new RuntimeException(e);
97+
}
98+
}
99+
100+
case VALUE: {
101+
ByteBuffer value = ByteBuffer.wrap(bytes);
102+
Struct header = HEADER_SCHEMA.read(value);
103+
short version = header.getShort(VERSION_KEY);
104+
Schema valueSchema = valueSchema(version);
105+
Struct valueStruct = valueSchema.read(value);
106+
107+
long upstreamOffset = valueStruct.getLong(UPSTREAM_OFFSET_KEY);
108+
long downstreamOffset = valueStruct.getLong(DOWNSTREAM_OFFSET_KEY);
109+
String metadata = valueStruct.getString(METADATA_KEY);
110+
111+
var map = Map.of(
112+
UPSTREAM_OFFSET_KEY, upstreamOffset,
113+
DOWNSTREAM_OFFSET_KEY, downstreamOffset,
114+
METADATA_KEY, metadata
115+
);
116+
117+
try {
118+
var result = OBJECT_MAPPER.writeValueAsString(map);
119+
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
120+
} catch (JsonProcessingException e) {
121+
log.error("Error serializing record", e);
122+
throw new RuntimeException(e);
123+
}
124+
}
125+
126+
};
126127
}
127128

128129
private static Schema valueSchema(short version) {

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/HeartbeatSerde.java

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
@Slf4j
1818
public class HeartbeatSerde implements BuiltInSerde {
1919

20-
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
20+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
2121

2222
public static final String SOURCE_CLUSTER_ALIAS_KEY = "sourceClusterAlias";
2323
public static final String TARGET_CLUSTER_ALIAS_KEY = "targetClusterAlias";
@@ -66,37 +66,38 @@ public Serializer serializer(String topic, Target type) {
6666

6767
@Override
6868
public Deserializer deserializer(String topic, Target target) {
69-
return (recordHeaders, bytes) -> switch (target) {
70-
71-
case KEY: {
72-
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
73-
String sourceClusterAlias = keyStruct.getString(SOURCE_CLUSTER_ALIAS_KEY);
74-
String targetClusterAlias = keyStruct.getString(TARGET_CLUSTER_ALIAS_KEY);
75-
76-
var map = Map.of(
77-
"sourceClusterAlias", sourceClusterAlias,
78-
"targetClusterAlias", targetClusterAlias
79-
);
80-
81-
try {
82-
var result = OBJECT_MAPPER.writeValueAsString(map);
83-
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
84-
} catch (JsonProcessingException e) {
85-
log.error("Error serializing record", e);
86-
throw new RuntimeException(e);
87-
}
88-
}
89-
90-
case VALUE: {
91-
ByteBuffer value = ByteBuffer.wrap(bytes);
92-
Struct headerStruct = HEADER_SCHEMA.read(value);
93-
short version = headerStruct.getShort(VERSION_KEY);
94-
Struct valueStruct = valueSchema(version).read(value);
95-
long timestamp = valueStruct.getLong(TIMESTAMP_KEY);
96-
yield new DeserializeResult(String.valueOf(timestamp), DeserializeResult.Type.STRING, Map.of());
97-
}
98-
99-
};
69+
return (recordHeaders, bytes) ->
70+
switch (target) {
71+
72+
case KEY: {
73+
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
74+
String sourceClusterAlias = keyStruct.getString(SOURCE_CLUSTER_ALIAS_KEY);
75+
String targetClusterAlias = keyStruct.getString(TARGET_CLUSTER_ALIAS_KEY);
76+
77+
var map = Map.of(
78+
"sourceClusterAlias", sourceClusterAlias,
79+
"targetClusterAlias", targetClusterAlias
80+
);
81+
82+
try {
83+
var result = OBJECT_MAPPER.writeValueAsString(map);
84+
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
85+
} catch (JsonProcessingException e) {
86+
log.error("Error serializing record", e);
87+
throw new RuntimeException(e);
88+
}
89+
}
90+
91+
case VALUE: {
92+
ByteBuffer value = ByteBuffer.wrap(bytes);
93+
Struct headerStruct = HEADER_SCHEMA.read(value);
94+
short version = headerStruct.getShort(VERSION_KEY);
95+
Struct valueStruct = valueSchema(version).read(value);
96+
long timestamp = valueStruct.getLong(TIMESTAMP_KEY);
97+
yield new DeserializeResult(String.valueOf(timestamp), DeserializeResult.Type.STRING, Map.of());
98+
}
99+
100+
};
100101
}
101102

102103
private static Schema valueSchema(short version) {

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/OffsetSyncSerde.java

Lines changed: 40 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
@Slf4j
1818
public class OffsetSyncSerde implements BuiltInSerde {
1919

20-
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
20+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
2121

2222
public static final String TOPIC_KEY = "topic";
2323
public static final String PARTITION_KEY = "partition";
@@ -62,43 +62,44 @@ public Serializer serializer(String topic, Target type) {
6262

6363
@Override
6464
public Deserializer deserializer(String topic, Target target) {
65-
return (recordHeaders, bytes) -> switch (target) {
66-
67-
case KEY: {
68-
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
69-
String t = keyStruct.getString(TOPIC_KEY);
70-
int partition = keyStruct.getInt(PARTITION_KEY);
71-
72-
var map = Map.of(
73-
TOPIC_KEY, t,
74-
PARTITION_KEY, partition
75-
);
76-
77-
try {
78-
var result = OBJECT_MAPPER.writeValueAsString(map);
79-
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
80-
} catch (JsonProcessingException e) {
81-
log.error("Error serializing record", e);
82-
throw new RuntimeException(e);
83-
}
84-
}
85-
86-
case VALUE: {
87-
Struct valueStruct = VALUE_SCHEMA.read(ByteBuffer.wrap(bytes));
88-
var map = Map.of(
89-
UPSTREAM_OFFSET_KEY, valueStruct.getLong(UPSTREAM_OFFSET_KEY),
90-
DOWNSTREAM_OFFSET_KEY, valueStruct.getLong(DOWNSTREAM_OFFSET_KEY)
91-
);
92-
93-
try {
94-
var result = OBJECT_MAPPER.writeValueAsString(map);
95-
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
96-
} catch (JsonProcessingException e) {
97-
log.error("Error serializing record", e);
98-
throw new RuntimeException(e);
99-
}
100-
}
101-
102-
};
65+
return (recordHeaders, bytes) ->
66+
switch (target) {
67+
68+
case KEY: {
69+
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
70+
String t = keyStruct.getString(TOPIC_KEY);
71+
int partition = keyStruct.getInt(PARTITION_KEY);
72+
73+
var map = Map.of(
74+
TOPIC_KEY, t,
75+
PARTITION_KEY, partition
76+
);
77+
78+
try {
79+
var result = OBJECT_MAPPER.writeValueAsString(map);
80+
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
81+
} catch (JsonProcessingException e) {
82+
log.error("Error serializing record", e);
83+
throw new RuntimeException(e);
84+
}
85+
}
86+
87+
case VALUE: {
88+
Struct valueStruct = VALUE_SCHEMA.read(ByteBuffer.wrap(bytes));
89+
var map = Map.of(
90+
UPSTREAM_OFFSET_KEY, valueStruct.getLong(UPSTREAM_OFFSET_KEY),
91+
DOWNSTREAM_OFFSET_KEY, valueStruct.getLong(DOWNSTREAM_OFFSET_KEY)
92+
);
93+
94+
try {
95+
var result = OBJECT_MAPPER.writeValueAsString(map);
96+
yield new DeserializeResult(result, DeserializeResult.Type.STRING, Map.of());
97+
} catch (JsonProcessingException e) {
98+
log.error("Error serializing record", e);
99+
throw new RuntimeException(e);
100+
}
101+
}
102+
103+
};
103104
}
104105
}

0 commit comments

Comments
 (0)