From 3890020aeac0cc071a25d7214a5339fef96a1bd2 Mon Sep 17 00:00:00 2001 From: YeeaaahMan Date: Tue, 24 Mar 2026 22:26:28 +0200 Subject: [PATCH 1/4] BE: support MessagePackSerde --- api/build.gradle | 1 + .../kafbat/ui/serdes/SerdesInitializer.java | 2 + .../ui/serdes/builtin/MessagePackSerde.java | 56 +++++++++++++++++++ 3 files changed, 59 insertions(+) create mode 100644 api/src/main/java/io/kafbat/ui/serdes/builtin/MessagePackSerde.java diff --git a/api/build.gradle b/api/build.gradle index 7e7469c86..a46bdb50d 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -58,6 +58,7 @@ dependencies { implementation libs.confluent.avro.serializer implementation libs.confluent.protobuf.serializer implementation libs.confluent.json.schema.serializer + implementation 'org.msgpack:msgpack-core:0.9.11' implementation libs.aws.msk.auth implementation(libs.azure.identity) { diff --git a/api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java b/api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java index 3b2eb52fc..675eaa760 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java +++ b/api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java @@ -14,6 +14,7 @@ import io.kafbat.ui.serdes.builtin.HexSerde; import io.kafbat.ui.serdes.builtin.Int32Serde; import io.kafbat.ui.serdes.builtin.Int64Serde; +import io.kafbat.ui.serdes.builtin.MessagePackSerde; import io.kafbat.ui.serdes.builtin.ProtobufFileSerde; import io.kafbat.ui.serdes.builtin.ProtobufRawSerde; import io.kafbat.ui.serdes.builtin.StringSerde; @@ -52,6 +53,7 @@ public SerdesInitializer() { .put(AvroEmbeddedSerde.NAME, AvroEmbeddedSerde.class) .put(Base64Serde.NAME, Base64Serde.class) .put(HexSerde.NAME, HexSerde.class) + .put(MessagePackSerde.NAME, MessagePackSerde.class) .put(UuidBinarySerde.NAME, UuidBinarySerde.class) .put(ProtobufRawSerde.NAME, ProtobufRawSerde.class) diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/MessagePackSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/MessagePackSerde.java new file mode 100644 index 000000000..3d7e054ba --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/MessagePackSerde.java @@ -0,0 +1,56 @@ +package io.kafbat.ui.serdes.builtin; + +import io.kafbat.ui.serde.api.DeserializeResult; +import io.kafbat.ui.serde.api.Serde; +import io.kafbat.ui.serdes.BuiltInSerde; +import java.io.IOException; +import java.util.Map; +import org.msgpack.core.MessageBufferPacker; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; +import org.msgpack.value.Value; + + +public class MessagePackSerde implements BuiltInSerde { + public static final String NAME = "MessagePack"; + + @Override + public boolean canDeserialize(String topic, Serde.Target type) { + return true; + } + + @Override + public boolean canSerialize(String topic, Serde.Target type) { + return true; + } + + @Override + public Serde.Serializer serializer(String topic, Serde.Target type) { + return inputString -> { + inputString = inputString.trim(); + // it is actually a hack to provide ability to sent empty array as a key/value + if (inputString.isEmpty()) { + return new byte[] {}; + } + try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) { + packer.packString(inputString); + packer.close(); + return packer.toByteArray(); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to serialize MessagePack payload", e); + } + }; + } + + @Override + public Serde.Deserializer deserializer(String topic, Serde.Target type) { + return (headers, data) -> { + try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(data)) { + Value value = unpacker.unpackValue(); + return new DeserializeResult(value.toString(), DeserializeResult.Type.STRING, Map.of()); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to deserialize MessagePack payload", e); + } + }; + } +} From ecb523a8a40d8d16e042f563b6e56efc0ebc28d0 Mon Sep 17 00:00:00 2001 From: YeeaaahMan Date: Tue, 24 Mar 2026 23:19:37 +0200 Subject: [PATCH 2/4] BE: support MessagePackSerde, fix warning --- .../main/java/io/kafbat/ui/serdes/builtin/MessagePackSerde.java | 1 - 1 file changed, 1 deletion(-) diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/MessagePackSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/MessagePackSerde.java index 3d7e054ba..e81cc8897 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/MessagePackSerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/MessagePackSerde.java @@ -34,7 +34,6 @@ public Serde.Serializer serializer(String topic, Serde.Target type) { } try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) { packer.packString(inputString); - packer.close(); return packer.toByteArray(); } catch (IOException e) { throw new IllegalArgumentException("Failed to serialize MessagePack payload", e); From ff1e313f3af28da93b86c4d8ec46ce5b3e86d6e1 Mon Sep 17 00:00:00 2001 From: YeeaaahMan Date: Wed, 25 Mar 2026 21:56:05 +0200 Subject: [PATCH 3/4] BE: migrate MessagePack dependency to version catalog --- api/build.gradle | 3 ++- gradle/libs.versions.toml | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/api/build.gradle b/api/build.gradle index a46bdb50d..ce4b76e50 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -58,7 +58,8 @@ dependencies { implementation libs.confluent.avro.serializer implementation libs.confluent.protobuf.serializer implementation libs.confluent.json.schema.serializer - implementation 'org.msgpack:msgpack-core:0.9.11' + + implementation libs.msgpack.core implementation libs.aws.msk.auth implementation(libs.azure.identity) { diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 966c2a5fb..bafea1cdf 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -16,6 +16,7 @@ avro = '1.12.1' byte-buddy = '1.18.1' confluent = '7.9.5' confluent-ccs = '7.9.5-ccs' +msgpack = "0.9.11" mapstruct = '1.6.2' lombok = '1.18.42' @@ -90,6 +91,8 @@ confluent-avro-serializer = { module = 'io.confluent:kafka-avro-serializer', ver confluent-protobuf-serializer = { module = 'io.confluent:kafka-protobuf-serializer', version.ref = 'confluent' } confluent-json-schema-serializer = { module = 'io.confluent:kafka-json-schema-serializer', version.ref = 'confluent' } +msgpack-core = { group = "org.msgpack", name = "msgpack-core", version.ref = "msgpack" } + aws-msk-auth = { module = 'software.amazon.msk:aws-msk-iam-auth', version.ref = 'aws-msk-auth' } azure-identity = { module = 'com.azure:azure-identity', version.ref = 'azure-identity' } reactor-test = { module = 'io.projectreactor:reactor-test' } From 5ddcd64393949f67bb10b1b11f05112101ceb897 Mon Sep 17 00:00:00 2001 From: YeeaaahMan Date: Thu, 26 Mar 2026 21:45:00 +0200 Subject: [PATCH 4/4] MessagePackSerde: remove broken serializer + add unittests --- .../ui/serdes/builtin/MessagePackSerde.java | 23 ------- .../serdes/builtin/MessagePackSerdeTest.java | 68 +++++++++++++++++++ 2 files changed, 68 insertions(+), 23 deletions(-) create mode 100644 api/src/test/java/io/kafbat/ui/serdes/builtin/MessagePackSerdeTest.java diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/MessagePackSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/MessagePackSerde.java index e81cc8897..a2f2fd15b 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/MessagePackSerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/MessagePackSerde.java @@ -5,7 +5,6 @@ import io.kafbat.ui.serdes.BuiltInSerde; import java.io.IOException; import java.util.Map; -import org.msgpack.core.MessageBufferPacker; import org.msgpack.core.MessagePack; import org.msgpack.core.MessageUnpacker; import org.msgpack.value.Value; @@ -19,28 +18,6 @@ public boolean canDeserialize(String topic, Serde.Target type) { return true; } - @Override - public boolean canSerialize(String topic, Serde.Target type) { - return true; - } - - @Override - public Serde.Serializer serializer(String topic, Serde.Target type) { - return inputString -> { - inputString = inputString.trim(); - // it is actually a hack to provide ability to sent empty array as a key/value - if (inputString.isEmpty()) { - return new byte[] {}; - } - try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) { - packer.packString(inputString); - return packer.toByteArray(); - } catch (IOException e) { - throw new IllegalArgumentException("Failed to serialize MessagePack payload", e); - } - }; - } - @Override public Serde.Deserializer deserializer(String topic, Serde.Target type) { return (headers, data) -> { diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/MessagePackSerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/MessagePackSerdeTest.java new file mode 100644 index 000000000..c35400f87 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/MessagePackSerdeTest.java @@ -0,0 +1,68 @@ +package io.kafbat.ui.serdes.builtin; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.kafbat.ui.serde.api.DeserializeResult; +import io.kafbat.ui.serde.api.Serde; +import io.kafbat.ui.serdes.PropertyResolverImpl; +import io.kafbat.ui.serdes.RecordHeadersImpl; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +public class MessagePackSerdeTest { + private static final String TEST_STRING = "{\"items\":[null,true,2]}"; + private static final byte[] TEST_BYTES = { + (byte) 0x81, + (byte) 0xa5, + (byte) 0x69, + (byte) 0x74, + (byte) 0x65, + (byte) 0x6d, + (byte) 0x73, + (byte) 0x93, + (byte) 0xc0, + (byte) 0xc3, + (byte) 0x02 + }; + + private Serde msgPackSerde; + + @BeforeEach + void init() { + msgPackSerde = new MessagePackSerde(); + msgPackSerde.configure( + PropertyResolverImpl.empty(), + PropertyResolverImpl.empty(), + PropertyResolverImpl.empty() + ); + } + + @ParameterizedTest + @EnumSource + void getSchemaReturnsEmpty(Serde.Target type) { + assertThat(msgPackSerde.getSchema("anyTopic", type)).isEmpty(); + } + + @ParameterizedTest + @EnumSource + void canSerializeReturnsFalseForAllInput(Serde.Target type) { + assertThat(msgPackSerde.canSerialize("anyTopic", type)).isFalse(); + } + + @ParameterizedTest + @EnumSource + void canDeserializeReturnsTrueForAllInputs(Serde.Target type) { + assertThat(msgPackSerde.canDeserialize("anyTopic", type)).isTrue(); + } + + @ParameterizedTest + @EnumSource + void deserializesDataAsMessagePackBytes(Serde.Target type) { + var deserializer = msgPackSerde.deserializer("anyTopic", type); + var result = deserializer.deserialize(new RecordHeadersImpl(), TEST_BYTES); + assertThat(result.getResult()).isEqualTo(TEST_STRING); + assertThat(result.getType()).isEqualTo(DeserializeResult.Type.STRING); + assertThat(result.getAdditionalProperties()).isEmpty(); + } +}