Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ dependencies {
implementation libs.confluent.avro.serializer
implementation libs.confluent.protobuf.serializer
implementation libs.confluent.json.schema.serializer
implementation 'org.msgpack:msgpack-core:0.9.11'

implementation libs.aws.msk.auth
implementation(libs.azure.identity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.kafbat.ui.serdes.builtin.HexSerde;
import io.kafbat.ui.serdes.builtin.Int32Serde;
import io.kafbat.ui.serdes.builtin.Int64Serde;
import io.kafbat.ui.serdes.builtin.MessagePackSerde;
import io.kafbat.ui.serdes.builtin.ProtobufFileSerde;
import io.kafbat.ui.serdes.builtin.ProtobufRawSerde;
import io.kafbat.ui.serdes.builtin.StringSerde;
Expand Down Expand Up @@ -52,6 +53,7 @@ public SerdesInitializer() {
.put(AvroEmbeddedSerde.NAME, AvroEmbeddedSerde.class)
.put(Base64Serde.NAME, Base64Serde.class)
.put(HexSerde.NAME, HexSerde.class)
.put(MessagePackSerde.NAME, MessagePackSerde.class)
.put(UuidBinarySerde.NAME, UuidBinarySerde.class)
.put(ProtobufRawSerde.NAME, ProtobufRawSerde.class)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.kafbat.ui.serdes.builtin;

import io.kafbat.ui.serde.api.DeserializeResult;
import io.kafbat.ui.serde.api.Serde;
import io.kafbat.ui.serdes.BuiltInSerde;
import java.io.IOException;
import java.util.Map;
import org.msgpack.core.MessageBufferPacker;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
import org.msgpack.value.Value;


public class MessagePackSerde implements BuiltInSerde {
public static final String NAME = "MessagePack";

@Override
public boolean canDeserialize(String topic, Serde.Target type) {
return true;
}

@Override
public boolean canSerialize(String topic, Serde.Target type) {
return true;
}

@Override
public Serde.Serializer serializer(String topic, Serde.Target type) {
return inputString -> {
inputString = inputString.trim();
// it is actually a hack to provide ability to sent empty array as a key/value
if (inputString.isEmpty()) {
return new byte[] {};
}
try (MessageBufferPacker packer = MessagePack.newDefaultBufferPacker()) {
packer.packString(inputString);
return packer.toByteArray();
} catch (IOException e) {
throw new IllegalArgumentException("Failed to serialize MessagePack payload", e);
}
};
}

@Override
public Serde.Deserializer deserializer(String topic, Serde.Target type) {
return (headers, data) -> {
try (MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(data)) {
Value value = unpacker.unpackValue();
return new DeserializeResult(value.toString(), DeserializeResult.Type.STRING, Map.of());
} catch (IOException e) {
throw new IllegalArgumentException("Failed to deserialize MessagePack payload", e);
}
};
}
}
Loading