Skip to content

Commit 267d52d

Browse files
committed
Split k/v methods
1 parent fc59e6b commit 267d52d

File tree

3 files changed

+114
-111
lines changed

3 files changed

+114
-111
lines changed

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/CheckpointSerde.java

Lines changed: 49 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -74,56 +74,57 @@ public Serializer serializer(String topic, Target type) {
7474
public Deserializer deserializer(String topic, Target target) {
7575
return (recordHeaders, bytes) ->
7676
switch (target) {
77+
case KEY -> deserializeKey(bytes);
78+
case VALUE -> deserializeValue(bytes);
79+
};
80+
}
7781

78-
case KEY: {
79-
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
80-
81-
String group = keyStruct.getString(CONSUMER_GROUP_ID_KEY);
82-
String t = keyStruct.getString(TOPIC_KEY);
83-
int partition = keyStruct.getInt(PARTITION_KEY);
84-
85-
var map = Map.of(
86-
CONSUMER_GROUP_ID_KEY, group,
87-
TOPIC_KEY, t,
88-
PARTITION_KEY, partition
89-
);
90-
91-
try {
92-
var result = OBJECT_MAPPER.writeValueAsString(map);
93-
yield new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
94-
} catch (JsonProcessingException e) {
95-
log.error("Error deserializing record", e);
96-
throw new RuntimeException("Error deserializing record", e);
97-
}
98-
}
99-
100-
case VALUE: {
101-
ByteBuffer value = ByteBuffer.wrap(bytes);
102-
Struct header = HEADER_SCHEMA.read(value);
103-
short version = header.getShort(VERSION_KEY);
104-
Schema valueSchema = valueSchema(version);
105-
Struct valueStruct = valueSchema.read(value);
106-
107-
long upstreamOffset = valueStruct.getLong(UPSTREAM_OFFSET_KEY);
108-
long downstreamOffset = valueStruct.getLong(DOWNSTREAM_OFFSET_KEY);
109-
String metadata = valueStruct.getString(METADATA_KEY);
110-
111-
var map = Map.of(
112-
UPSTREAM_OFFSET_KEY, upstreamOffset,
113-
DOWNSTREAM_OFFSET_KEY, downstreamOffset,
114-
METADATA_KEY, metadata
115-
);
116-
117-
try {
118-
var result = OBJECT_MAPPER.writeValueAsString(map);
119-
yield new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
120-
} catch (JsonProcessingException e) {
121-
log.error("Error deserializing record", e);
122-
throw new RuntimeException("Error deserializing record", e);
123-
}
124-
}
82+
private static DeserializeResult deserializeKey(byte[] bytes) {
83+
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
84+
85+
String group = keyStruct.getString(CONSUMER_GROUP_ID_KEY);
86+
String t = keyStruct.getString(TOPIC_KEY);
87+
int partition = keyStruct.getInt(PARTITION_KEY);
88+
89+
var map = Map.of(
90+
CONSUMER_GROUP_ID_KEY, group,
91+
TOPIC_KEY, t,
92+
PARTITION_KEY, partition
93+
);
94+
95+
try {
96+
var result = OBJECT_MAPPER.writeValueAsString(map);
97+
return new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
98+
} catch (JsonProcessingException e) {
99+
log.error("Error deserializing record", e);
100+
throw new RuntimeException("Error deserializing record", e);
101+
}
102+
}
125103

126-
};
104+
private static DeserializeResult deserializeValue(byte[] bytes) {
105+
ByteBuffer value = ByteBuffer.wrap(bytes);
106+
Struct header = HEADER_SCHEMA.read(value);
107+
short version = header.getShort(VERSION_KEY);
108+
Schema valueSchema = valueSchema(version);
109+
Struct valueStruct = valueSchema.read(value);
110+
111+
long upstreamOffset = valueStruct.getLong(UPSTREAM_OFFSET_KEY);
112+
long downstreamOffset = valueStruct.getLong(DOWNSTREAM_OFFSET_KEY);
113+
String metadata = valueStruct.getString(METADATA_KEY);
114+
115+
var map = Map.of(
116+
UPSTREAM_OFFSET_KEY, upstreamOffset,
117+
DOWNSTREAM_OFFSET_KEY, downstreamOffset,
118+
METADATA_KEY, metadata
119+
);
120+
121+
try {
122+
var result = OBJECT_MAPPER.writeValueAsString(map);
123+
return new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
124+
} catch (JsonProcessingException e) {
125+
log.error("Error deserializing record", e);
126+
throw new RuntimeException("Error deserializing record", e);
127+
}
127128
}
128129

129130
private static Schema valueSchema(short version) {

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/HeartbeatSerde.java

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -68,36 +68,37 @@ public Serializer serializer(String topic, Target type) {
6868
public Deserializer deserializer(String topic, Target target) {
6969
return (recordHeaders, bytes) ->
7070
switch (target) {
71+
case KEY -> deserializeKey(bytes);
72+
case VALUE -> deserializeValue(bytes);
73+
};
74+
}
7175

72-
case KEY: {
73-
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
74-
String sourceClusterAlias = keyStruct.getString(SOURCE_CLUSTER_ALIAS_KEY);
75-
String targetClusterAlias = keyStruct.getString(TARGET_CLUSTER_ALIAS_KEY);
76-
77-
var map = Map.of(
78-
"sourceClusterAlias", sourceClusterAlias,
79-
"targetClusterAlias", targetClusterAlias
80-
);
81-
82-
try {
83-
var result = OBJECT_MAPPER.writeValueAsString(map);
84-
yield new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
85-
} catch (JsonProcessingException e) {
86-
log.error("Error deserializing record", e);
87-
throw new RuntimeException("Error deserializing record", e);
88-
}
89-
}
90-
91-
case VALUE: {
92-
ByteBuffer value = ByteBuffer.wrap(bytes);
93-
Struct headerStruct = HEADER_SCHEMA.read(value);
94-
short version = headerStruct.getShort(VERSION_KEY);
95-
Struct valueStruct = valueSchema(version).read(value);
96-
long timestamp = valueStruct.getLong(TIMESTAMP_KEY);
97-
yield new DeserializeResult(String.valueOf(timestamp), DeserializeResult.Type.STRING, Map.of());
98-
}
76+
private static DeserializeResult deserializeKey(byte[] bytes) {
77+
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
78+
String sourceClusterAlias = keyStruct.getString(SOURCE_CLUSTER_ALIAS_KEY);
79+
String targetClusterAlias = keyStruct.getString(TARGET_CLUSTER_ALIAS_KEY);
80+
81+
var map = Map.of(
82+
"sourceClusterAlias", sourceClusterAlias,
83+
"targetClusterAlias", targetClusterAlias
84+
);
85+
86+
try {
87+
var result = OBJECT_MAPPER.writeValueAsString(map);
88+
return new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
89+
} catch (JsonProcessingException e) {
90+
log.error("Error deserializing record", e);
91+
throw new RuntimeException("Error deserializing record", e);
92+
}
93+
}
9994

100-
};
95+
private static DeserializeResult deserializeValue(byte[] bytes) {
96+
ByteBuffer value = ByteBuffer.wrap(bytes);
97+
Struct headerStruct = HEADER_SCHEMA.read(value);
98+
short version = headerStruct.getShort(VERSION_KEY);
99+
Struct valueStruct = valueSchema(version).read(value);
100+
long timestamp = valueStruct.getLong(TIMESTAMP_KEY);
101+
return new DeserializeResult(String.valueOf(timestamp), DeserializeResult.Type.STRING, Map.of());
101102
}
102103

103104
private static Schema valueSchema(short version) {

api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/OffsetSyncSerde.java

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -64,42 +64,43 @@ public Serializer serializer(String topic, Target type) {
6464
public Deserializer deserializer(String topic, Target target) {
6565
return (recordHeaders, bytes) ->
6666
switch (target) {
67+
case KEY -> deserializeKey(bytes);
68+
case VALUE -> deserializeValue(bytes);
69+
};
70+
}
6771

68-
case KEY: {
69-
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
70-
String t = keyStruct.getString(TOPIC_KEY);
71-
int partition = keyStruct.getInt(PARTITION_KEY);
72-
73-
var map = Map.of(
74-
TOPIC_KEY, t,
75-
PARTITION_KEY, partition
76-
);
77-
78-
try {
79-
var result = OBJECT_MAPPER.writeValueAsString(map);
80-
yield new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
81-
} catch (JsonProcessingException e) {
82-
log.error("Error deserializing record", e);
83-
throw new RuntimeException("Error deserializing record", e);
84-
}
85-
}
86-
87-
case VALUE: {
88-
Struct valueStruct = VALUE_SCHEMA.read(ByteBuffer.wrap(bytes));
89-
var map = Map.of(
90-
UPSTREAM_OFFSET_KEY, valueStruct.getLong(UPSTREAM_OFFSET_KEY),
91-
DOWNSTREAM_OFFSET_KEY, valueStruct.getLong(DOWNSTREAM_OFFSET_KEY)
92-
);
93-
94-
try {
95-
var result = OBJECT_MAPPER.writeValueAsString(map);
96-
yield new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
97-
} catch (JsonProcessingException e) {
98-
log.error("Error deserializing record", e);
99-
throw new RuntimeException("Error deserializing record", e);
100-
}
101-
}
72+
private static DeserializeResult deserializeKey(byte[] bytes) {
73+
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(bytes));
74+
String t = keyStruct.getString(TOPIC_KEY);
75+
int partition = keyStruct.getInt(PARTITION_KEY);
76+
77+
var map = Map.of(
78+
TOPIC_KEY, t,
79+
PARTITION_KEY, partition
80+
);
81+
82+
try {
83+
var result = OBJECT_MAPPER.writeValueAsString(map);
84+
return new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
85+
} catch (JsonProcessingException e) {
86+
log.error("Error deserializing record", e);
87+
throw new RuntimeException("Error deserializing record", e);
88+
}
89+
}
10290

103-
};
91+
private static DeserializeResult deserializeValue(byte[] bytes) {
92+
Struct valueStruct = VALUE_SCHEMA.read(ByteBuffer.wrap(bytes));
93+
var map = Map.of(
94+
UPSTREAM_OFFSET_KEY, valueStruct.getLong(UPSTREAM_OFFSET_KEY),
95+
DOWNSTREAM_OFFSET_KEY, valueStruct.getLong(DOWNSTREAM_OFFSET_KEY)
96+
);
97+
98+
try {
99+
var result = OBJECT_MAPPER.writeValueAsString(map);
100+
return new DeserializeResult(result, DeserializeResult.Type.JSON, Map.of());
101+
} catch (JsonProcessingException e) {
102+
log.error("Error deserializing record", e);
103+
throw new RuntimeException("Error deserializing record", e);
104+
}
104105
}
105106
}

0 commit comments

Comments
 (0)