diff --git a/api/src/main/java/io/kafbat/ui/serdes/BuiltInSerde.java b/api/src/main/java/io/kafbat/ui/serdes/BuiltInSerde.java index 0b4755ba3..439f69ded 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/BuiltInSerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/BuiltInSerde.java @@ -1,7 +1,9 @@ package io.kafbat.ui.serdes; import io.kafbat.ui.serde.api.PropertyResolver; +import io.kafbat.ui.serde.api.SchemaDescription; import io.kafbat.ui.serde.api.Serde; +import java.util.Optional; public interface BuiltInSerde extends Serde { @@ -24,4 +26,25 @@ default void configure(PropertyResolver serdeProperties, PropertyResolver kafkaClusterProperties, PropertyResolver globalProperties) { } + + @Override + default boolean canSerialize(String topic, Serde.Target type) { + return false; + } + + @Override + default Serde.Serializer serializer(String topic, Serde.Target type) { + throw new UnsupportedOperationException(); + } + + @Override + default Optional getSchema(String topic, Serde.Target type) { + return Optional.empty(); + } + + @Override + default Optional getDescription() { + return Optional.empty(); + } + } diff --git a/api/src/main/java/io/kafbat/ui/serdes/ClusterSerdes.java b/api/src/main/java/io/kafbat/ui/serdes/ClusterSerdes.java index fbdfdda48..b73ec193d 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/ClusterSerdes.java +++ b/api/src/main/java/io/kafbat/ui/serdes/ClusterSerdes.java @@ -64,12 +64,12 @@ public Stream all() { public SerdeInstance suggestSerdeForSerialize(String topic, Serde.Target type) { return findSerdeByPatternsOrDefault(topic, type, s -> s.canSerialize(topic, type)) - .orElse(serdes.get(StringSerde.name())); + .orElse(serdes.get(StringSerde.NAME)); } public SerdeInstance suggestSerdeForDeserialize(String topic, Serde.Target type) { return findSerdeByPatternsOrDefault(topic, type, s -> s.canDeserialize(topic, type)) - .orElse(serdes.get(StringSerde.name())); + .orElse(serdes.get(StringSerde.NAME)); } @Override diff --git a/api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java b/api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java index bbdba76c4..3b2eb52fc 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java +++ b/api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java @@ -20,6 +20,9 @@ import io.kafbat.ui.serdes.builtin.UInt32Serde; import io.kafbat.ui.serdes.builtin.UInt64Serde; import io.kafbat.ui.serdes.builtin.UuidBinarySerde; +import io.kafbat.ui.serdes.builtin.mm2.CheckpointSerde; +import io.kafbat.ui.serdes.builtin.mm2.HeartbeatSerde; +import io.kafbat.ui.serdes.builtin.mm2.OffsetSyncSerde; import io.kafbat.ui.serdes.builtin.sr.SchemaRegistrySerde; import java.util.LinkedHashMap; import java.util.Map; @@ -39,18 +42,23 @@ public class SerdesInitializer { public SerdesInitializer() { this( ImmutableMap.>builder() - .put(StringSerde.name(), StringSerde.class) - .put(SchemaRegistrySerde.name(), SchemaRegistrySerde.class) - .put(ProtobufFileSerde.name(), ProtobufFileSerde.class) - .put(Int32Serde.name(), Int32Serde.class) - .put(Int64Serde.name(), Int64Serde.class) - .put(UInt32Serde.name(), UInt32Serde.class) - .put(UInt64Serde.name(), UInt64Serde.class) - .put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class) - .put(Base64Serde.name(), Base64Serde.class) - .put(HexSerde.name(), HexSerde.class) - .put(UuidBinarySerde.name(), UuidBinarySerde.class) - .put(ProtobufRawSerde.name(), ProtobufRawSerde.class) + .put(StringSerde.NAME, StringSerde.class) + .put(SchemaRegistrySerde.NAME, SchemaRegistrySerde.class) + .put(ProtobufFileSerde.NAME, ProtobufFileSerde.class) + .put(Int32Serde.NAME, Int32Serde.class) + .put(Int64Serde.NAME, Int64Serde.class) + .put(UInt32Serde.NAME, UInt32Serde.class) + .put(UInt64Serde.NAME, UInt64Serde.class) + .put(AvroEmbeddedSerde.NAME, AvroEmbeddedSerde.class) + .put(Base64Serde.NAME, Base64Serde.class) + .put(HexSerde.NAME, HexSerde.class) + .put(UuidBinarySerde.NAME, UuidBinarySerde.class) + .put(ProtobufRawSerde.NAME, ProtobufRawSerde.class) + + // mm2 serdes + .put(HeartbeatSerde.NAME, HeartbeatSerde.class) + .put(OffsetSyncSerde.NAME, OffsetSyncSerde.class) + .put(CheckpointSerde.NAME, CheckpointSerde.class) .build(), new CustomSerdeLoader() ); @@ -131,8 +139,8 @@ public ClusterSerdes init(Environment env, .orElse(null), Optional.ofNullable(clusterProperties.getDefaultValueSerde()) .map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default value serde not found")) - .or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.name()))) - .or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.name()))) + .or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.NAME))) + .or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.NAME))) .orElse(null), createFallbackSerde() ); @@ -142,15 +150,16 @@ public ClusterSerdes init(Environment env, * Registers serdse that should only be used for specific (hard-coded) topics, like ConsumerOffsetsSerde. */ private void registerTopicRelatedSerde(Map serdes) { - registerConsumerOffsetsSerde(serdes); + serdes.putAll(consumerOffsetsSerde()); + serdes.putAll(mirrorMakerSerdes()); } - private void registerConsumerOffsetsSerde(Map serdes) { + private Map consumerOffsetsSerde() { var pattern = Pattern.compile(ConsumerOffsetsSerde.TOPIC); - serdes.put( - ConsumerOffsetsSerde.name(), + return Map.of( + ConsumerOffsetsSerde.NAME, new SerdeInstance( - ConsumerOffsetsSerde.name(), + ConsumerOffsetsSerde.NAME, new ConsumerOffsetsSerde(), pattern, pattern, @@ -159,6 +168,21 @@ private void registerConsumerOffsetsSerde(Map serdes) { ); } + private Map mirrorMakerSerdes() { + return Map.of( + HeartbeatSerde.NAME, + mirrorSerde(HeartbeatSerde.NAME, HeartbeatSerde.TOPIC_NAME_PATTERN, new HeartbeatSerde()), + OffsetSyncSerde.NAME, + mirrorSerde(OffsetSyncSerde.NAME, OffsetSyncSerde.TOPIC_NAME_PATTERN, new OffsetSyncSerde()), + CheckpointSerde.NAME, + mirrorSerde(CheckpointSerde.NAME, CheckpointSerde.TOPIC_NAME_PATTERN, new CheckpointSerde()) + ); + } + + private SerdeInstance mirrorSerde(String name, Pattern pattern, BuiltInSerde serde) { + return new SerdeInstance(name, serde, pattern, pattern, null); + } + private SerdeInstance createFallbackSerde() { StringSerde serde = new StringSerde(); serde.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty()); diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/AvroEmbeddedSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/AvroEmbeddedSerde.java index 8d5d3dfa5..60874d3cd 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/AvroEmbeddedSerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/AvroEmbeddedSerde.java @@ -13,31 +13,13 @@ import org.apache.avro.generic.GenericDatumReader; public class AvroEmbeddedSerde implements BuiltInSerde { - - public static String name() { - return "Avro (Embedded)"; - } - - @Override - public Optional getDescription() { - return Optional.empty(); - } - - @Override - public Optional getSchema(String topic, Target type) { - return Optional.empty(); - } + public static final String NAME = "Avro (Embedded)"; @Override public boolean canDeserialize(String topic, Target type) { return true; } - @Override - public boolean canSerialize(String topic, Target type) { - return false; - } - @Override public Serializer serializer(String topic, Target type) { throw new IllegalStateException(); diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/Base64Serde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/Base64Serde.java index 515354695..fff282714 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/Base64Serde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/Base64Serde.java @@ -1,28 +1,13 @@ package io.kafbat.ui.serdes.builtin; import io.kafbat.ui.serde.api.DeserializeResult; -import io.kafbat.ui.serde.api.SchemaDescription; import io.kafbat.ui.serde.api.Serde; import io.kafbat.ui.serdes.BuiltInSerde; import java.util.Base64; import java.util.Map; -import java.util.Optional; public class Base64Serde implements BuiltInSerde { - - public static String name() { - return "Base64"; - } - - @Override - public Optional getDescription() { - return Optional.empty(); - } - - @Override - public Optional getSchema(String topic, Serde.Target type) { - return Optional.empty(); - } + public static final String NAME = "Base64"; @Override public boolean canDeserialize(String topic, Serde.Target type) { diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/ConsumerOffsetsSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/ConsumerOffsetsSerde.java index fdf6d0c79..9246f705e 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/ConsumerOffsetsSerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/ConsumerOffsetsSerde.java @@ -1,31 +1,19 @@ package io.kafbat.ui.serdes.builtin; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.json.JsonMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; import io.kafbat.ui.serde.api.DeserializeResult; -import io.kafbat.ui.serde.api.SchemaDescription; import io.kafbat.ui.serdes.BuiltInSerde; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; -import java.util.Optional; -import lombok.SneakyThrows; import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.BoundField; import org.apache.kafka.common.protocol.types.CompactArrayOf; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; // Deserialization logic and message's schemas can be found in // kafka.coordinator.group.GroupMetadataManager (readMessageKey, readOffsetMessageValue, readGroupMessageValue) -public class ConsumerOffsetsSerde implements BuiltInSerde { - - private static final JsonMapper JSON_MAPPER = createMapper(); +public class ConsumerOffsetsSerde extends StructSerde implements BuiltInSerde { + public static final String NAME = "__consumer_offsets"; private static final String ASSIGNMENT = "assignment"; private static final String CLIENT_HOST = "client_host"; @@ -46,53 +34,11 @@ public class ConsumerOffsetsSerde implements BuiltInSerde { public static final String TOPIC = "__consumer_offsets"; - public static String name() { - return "__consumer_offsets"; - } - - private static JsonMapper createMapper() { - var module = new SimpleModule(); - module.addSerializer(Struct.class, new JsonSerializer<>() { - @Override - public void serialize(Struct value, JsonGenerator gen, SerializerProvider serializers) throws IOException { - gen.writeStartObject(); - for (BoundField field : value.schema().fields()) { - var fieldVal = value.get(field); - gen.writeObjectField(field.def.name, fieldVal); - } - gen.writeEndObject(); - } - }); - var mapper = new JsonMapper(); - mapper.registerModule(module); - return mapper; - } - - @Override - public Optional getDescription() { - return Optional.empty(); - } - - @Override - public Optional getSchema(String topic, Target type) { - return Optional.empty(); - } - @Override public boolean canDeserialize(String topic, Target type) { return topic.equals(TOPIC); } - @Override - public boolean canSerialize(String topic, Target type) { - return false; - } - - @Override - public Serializer serializer(String topic, Target type) { - throw new UnsupportedOperationException(); - } - @Override public Deserializer deserializer(String topic, Target type) { return switch (type) { @@ -304,8 +250,5 @@ private Deserializer valueDeserializer() { }; } - @SneakyThrows - private String toJson(Struct s) { - return JSON_MAPPER.writeValueAsString(s); - } + } diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/HexSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/HexSerde.java index ab7f66ebb..ab6467b52 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/HexSerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/HexSerde.java @@ -2,20 +2,15 @@ import io.kafbat.ui.serde.api.DeserializeResult; import io.kafbat.ui.serde.api.PropertyResolver; -import io.kafbat.ui.serde.api.SchemaDescription; import io.kafbat.ui.serdes.BuiltInSerde; import java.util.HexFormat; import java.util.Map; -import java.util.Optional; public class HexSerde implements BuiltInSerde { + public static final String NAME = "Hex"; private HexFormat deserializeHexFormat; - public static String name() { - return "Hex"; - } - @Override public void autoConfigure(PropertyResolver kafkaClusterProperties, PropertyResolver globalProperties) { configure(" ", true); @@ -37,16 +32,6 @@ private void configure(String delim, boolean uppercase) { } } - @Override - public Optional getDescription() { - return Optional.empty(); - } - - @Override - public Optional getSchema(String topic, Target type) { - return Optional.empty(); - } - @Override public boolean canDeserialize(String topic, Target type) { return true; diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/Int32Serde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/Int32Serde.java index d402a78b9..d5f6163f0 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/Int32Serde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/Int32Serde.java @@ -8,15 +8,7 @@ import java.util.Optional; public class Int32Serde implements BuiltInSerde { - - public static String name() { - return "Int32"; - } - - @Override - public Optional getDescription() { - return Optional.empty(); - } + public static final String NAME = "Int32"; @Override public Optional getSchema(String topic, Target type) { diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/Int64Serde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/Int64Serde.java index 0616eff2f..e4ffbfe92 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/Int64Serde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/Int64Serde.java @@ -9,15 +9,7 @@ import java.util.Optional; public class Int64Serde implements BuiltInSerde { - - public static String name() { - return "Int64"; - } - - @Override - public Optional getDescription() { - return Optional.empty(); - } + public static final String NAME = "Int64"; @Override public Optional getSchema(String topic, Serde.Target type) { diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java index 9b78cc7b2..a2432c9f9 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java @@ -69,10 +69,7 @@ @Slf4j public class ProtobufFileSerde implements BuiltInSerde { - - public static String name() { - return "ProtobufFile"; - } + public static final String NAME = "ProtobufFile"; private static final ProtobufSchemaConverter SCHEMA_CONVERTER = new ProtobufSchemaConverter(); @@ -112,7 +109,7 @@ void configure(Configuration configuration) { && configuration.defaultKeyMessageDescriptor() == null && configuration.messageDescriptorMap().isEmpty() && configuration.keyMessageDescriptorMap().isEmpty()) { - throw new ValidationException("Neither default, nor per-topic descriptors defined for " + name() + " serde"); + throw new ValidationException("Neither default, nor per-topic descriptors defined for " + NAME + " serde"); } this.defaultMessageDescriptor = configuration.defaultMessageDescriptor(); this.defaultKeyMessageDescriptor = configuration.defaultKeyMessageDescriptor(); @@ -121,11 +118,6 @@ void configure(Configuration configuration) { this.keyMessageDescriptorMap = configuration.keyMessageDescriptorMap(); } - @Override - public Optional getDescription() { - return Optional.empty(); - } - private Optional descriptorFor(String topic, Serde.Target type) { return type == Serde.Target.KEY ? diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufRawSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufRawSerde.java index f55ddd93a..bb2e1eac3 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufRawSerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufRawSerde.java @@ -12,35 +12,13 @@ import lombok.SneakyThrows; public class ProtobufRawSerde implements BuiltInSerde { - - public static String name() { - return "ProtobufDecodeRaw"; - } - - @Override - public Optional getDescription() { - return Optional.empty(); - } - - @Override - public Optional getSchema(String topic, Serde.Target type) { - return Optional.empty(); - } - - @Override - public boolean canSerialize(String topic, Serde.Target type) { - return false; - } + public static final String NAME = "ProtobufDecodeRaw"; @Override public boolean canDeserialize(String topic, Serde.Target type) { return true; } - @Override - public Serde.Serializer serializer(String topic, Serde.Target type) { - throw new UnsupportedOperationException(); - } @Override public Serde.Deserializer deserializer(String topic, Serde.Target type) { diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/StringSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/StringSerde.java index d4351bbc7..2ce85e763 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/StringSerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/StringSerde.java @@ -2,18 +2,13 @@ import io.kafbat.ui.serde.api.DeserializeResult; import io.kafbat.ui.serde.api.PropertyResolver; -import io.kafbat.ui.serde.api.SchemaDescription; import io.kafbat.ui.serdes.BuiltInSerde; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Map; -import java.util.Optional; public class StringSerde implements BuiltInSerde { - - public static String name() { - return "String"; - } + public static final String NAME = "String"; private Charset encoding = StandardCharsets.UTF_8; @@ -26,16 +21,6 @@ public void configure(PropertyResolver serdeProperties, .ifPresent(e -> StringSerde.this.encoding = e); } - @Override - public Optional getDescription() { - return Optional.empty(); - } - - @Override - public Optional getSchema(String topic, Target type) { - return Optional.empty(); - } - @Override public boolean canDeserialize(String topic, Target type) { return true; diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/StructSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/StructSerde.java new file mode 100644 index 000000000..6e7e049ee --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/StructSerde.java @@ -0,0 +1,46 @@ +package io.kafbat.ui.serdes.builtin; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import io.kafbat.ui.serde.api.Serde; +import io.kafbat.ui.serdes.BuiltInSerde; +import java.io.IOException; +import lombok.SneakyThrows; +import org.apache.kafka.common.protocol.types.BoundField; +import org.apache.kafka.common.protocol.types.Struct; + +public abstract class StructSerde implements BuiltInSerde { + + private static final JsonMapper JSON_MAPPER = createMapper(); + + private static JsonMapper createMapper() { + var module = new SimpleModule(); + module.addSerializer(Struct.class, new JsonSerializer<>() { + @Override + public void serialize(Struct value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeStartObject(); + for (BoundField field : value.schema().fields()) { + var fieldVal = value.get(field); + gen.writeObjectField(field.def.name, fieldVal); + } + gen.writeEndObject(); + } + }); + var mapper = new JsonMapper(); + mapper.registerModule(module); + return mapper; + } + + @Override + public boolean canDeserialize(String topic, Serde.Target type) { + return true; + } + + @SneakyThrows + protected String toJson(Struct s) { + return JSON_MAPPER.writeValueAsString(s); + } +} diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/UInt32Serde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/UInt32Serde.java index a767e1a62..63dce83dc 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/UInt32Serde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/UInt32Serde.java @@ -10,15 +10,7 @@ import java.util.Optional; public class UInt32Serde implements BuiltInSerde { - - public static String name() { - return "UInt32"; - } - - @Override - public Optional getDescription() { - return Optional.empty(); - } + public static final String NAME = "UInt32"; @Override public Optional getSchema(String topic, Serde.Target type) { diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/UInt64Serde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/UInt64Serde.java index 4b090329a..e89274c31 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/UInt64Serde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/UInt64Serde.java @@ -10,15 +10,7 @@ public class UInt64Serde implements BuiltInSerde { - - public static String name() { - return "UInt64"; - } - - @Override - public Optional getDescription() { - return Optional.empty(); - } + public static final String NAME = "UInt64"; @Override public Optional getSchema(String topic, Target type) { diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/UuidBinarySerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/UuidBinarySerde.java index f94232aee..0a9178fc6 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/UuidBinarySerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/UuidBinarySerde.java @@ -3,20 +3,15 @@ import io.kafbat.ui.exception.ValidationException; import io.kafbat.ui.serde.api.DeserializeResult; import io.kafbat.ui.serde.api.PropertyResolver; -import io.kafbat.ui.serde.api.SchemaDescription; import io.kafbat.ui.serde.api.Serde; import io.kafbat.ui.serdes.BuiltInSerde; import java.nio.ByteBuffer; import java.util.Map; -import java.util.Optional; import java.util.UUID; public class UuidBinarySerde implements BuiltInSerde { - - public static String name() { - return "UUIDBinary"; - } + public static final String NAME = "UUIDBinary"; private boolean mostSignificantBitsFirst = true; @@ -28,16 +23,6 @@ public void configure(PropertyResolver serdeProperties, .ifPresent(msb -> UuidBinarySerde.this.mostSignificantBitsFirst = msb); } - @Override - public Optional getDescription() { - return Optional.empty(); - } - - @Override - public Optional getSchema(String topic, Serde.Target type) { - return Optional.empty(); - } - @Override public boolean canDeserialize(String topic, Serde.Target type) { return true; diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/CheckpointSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/CheckpointSerde.java new file mode 100644 index 000000000..01ddfad05 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/CheckpointSerde.java @@ -0,0 +1,52 @@ +package io.kafbat.ui.serdes.builtin.mm2; + +import io.kafbat.ui.serdes.BuiltInSerde; +import java.util.Optional; +import java.util.regex.Pattern; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Type; + +@Slf4j +public class CheckpointSerde extends MirrorMakerSerde implements BuiltInSerde { + public static final String NAME = "mm2-Checkpoint"; + public static final Pattern TOPIC_NAME_PATTERN = Pattern.compile(".*\\.checkpoints\\.internal"); + + private static final String TOPIC_KEY = "topic"; + private static final String PARTITION_KEY = "partition"; + private static final String CONSUMER_GROUP_ID_KEY = "group"; + private static final String UPSTREAM_OFFSET_KEY = "upstreamOffset"; + private static final String DOWNSTREAM_OFFSET_KEY = "offset"; + private static final String METADATA_KEY = "metadata"; + + private static final Schema VALUE_SCHEMA_V0 = new Schema( + new Field(UPSTREAM_OFFSET_KEY, Type.INT64), + new Field(DOWNSTREAM_OFFSET_KEY, Type.INT64), + new Field(METADATA_KEY, Type.STRING)); + + private static final Schema KEY_SCHEMA = new Schema( + new Field(CONSUMER_GROUP_ID_KEY, Type.STRING), + new Field(TOPIC_KEY, Type.STRING), + new Field(PARTITION_KEY, Type.INT32)); + + public CheckpointSerde() { + super(true); + } + + @Override + protected Schema getKeySchema() { + return KEY_SCHEMA; + } + + @Override + protected Optional getVersionedValueSchema(short version) { + if (version == 0) { + return Optional.of(VALUE_SCHEMA_V0); + } else { + log.warn("Unsupported version of CheckpointSerde: {}", version); + return Optional.empty(); + } + } + +} diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/HeartbeatSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/HeartbeatSerde.java new file mode 100644 index 000000000..92f05e442 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/HeartbeatSerde.java @@ -0,0 +1,44 @@ +package io.kafbat.ui.serdes.builtin.mm2; + +import io.kafbat.ui.serdes.BuiltInSerde; +import java.util.Optional; +import java.util.regex.Pattern; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Type; + +@Slf4j +public class HeartbeatSerde extends MirrorMakerSerde implements BuiltInSerde { + public static final String NAME = "mm2-Heartbeat"; + public static final Pattern TOPIC_NAME_PATTERN = Pattern.compile("heartbeats"); + + private static final String SOURCE_CLUSTER_ALIAS_KEY = "sourceClusterAlias"; + private static final String TARGET_CLUSTER_ALIAS_KEY = "targetClusterAlias"; + private static final String TIMESTAMP_KEY = "timestamp"; + + private static final Schema VALUE_SCHEMA_V0 = new Schema( + new Field(TIMESTAMP_KEY, Type.INT64)); + + private static final Schema KEY_SCHEMA = new Schema( + new Field(SOURCE_CLUSTER_ALIAS_KEY, Type.STRING), + new Field(TARGET_CLUSTER_ALIAS_KEY, Type.STRING)); + + public HeartbeatSerde() { + super(true); + } + + protected Schema getKeySchema() { + return KEY_SCHEMA; + } + + @Override + protected Optional getVersionedValueSchema(short version) { + if (version == 0) { + return Optional.of(VALUE_SCHEMA_V0); + } else { + log.warn("Unsupported version of HeartbeatSerde: {}", version); + return Optional.empty(); + } + } +} diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/MirrorMakerSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/MirrorMakerSerde.java new file mode 100644 index 000000000..c0cfc8e02 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/MirrorMakerSerde.java @@ -0,0 +1,57 @@ +package io.kafbat.ui.serdes.builtin.mm2; + +import io.kafbat.ui.serde.api.DeserializeResult; +import io.kafbat.ui.serdes.BuiltInSerde; +import io.kafbat.ui.serdes.builtin.StructSerde; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +@RequiredArgsConstructor +abstract class MirrorMakerSerde extends StructSerde implements BuiltInSerde { + + protected final boolean versioned; + + @Override + public Deserializer deserializer(String topic, Target target) { + return (recordHeaders, bytes) -> + new DeserializeResult(toJson(switch (target) { + case KEY -> deserializeKey(bytes); + case VALUE -> deserializeValue(bytes); + }), DeserializeResult.Type.JSON, Map.of()); + } + + protected Struct deserializeKey(byte[] bytes) { + return getKeySchema().read(ByteBuffer.wrap(bytes)); + } + + protected Struct deserializeValue(byte[] bytes) { + ByteBuffer wrap = ByteBuffer.wrap(bytes); + Optional valueSchema; + if (versioned) { + short version = wrap.getShort(); + valueSchema = getVersionedValueSchema(version); + } else { + valueSchema = getValueSchema(); + } + if (valueSchema.isPresent()) { + return valueSchema.get().read(wrap); + } else { + throw new IllegalStateException("Value schema was not present"); + } + } + + protected abstract Schema getKeySchema(); + + protected Optional getValueSchema() { + return Optional.empty(); + } + + protected Optional getVersionedValueSchema(short version) { + throw new UnsupportedOperationException("Versioned value schema is not supported"); + } + +} diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/OffsetSyncSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/OffsetSyncSerde.java new file mode 100644 index 000000000..d80415c93 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/mm2/OffsetSyncSerde.java @@ -0,0 +1,43 @@ +package io.kafbat.ui.serdes.builtin.mm2; + +import io.kafbat.ui.serdes.BuiltInSerde; +import java.util.Optional; +import java.util.regex.Pattern; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Type; + +@Slf4j +public class OffsetSyncSerde extends MirrorMakerSerde implements BuiltInSerde { + public static final String NAME = "mm2-OffsetSync"; + public static final Pattern TOPIC_NAME_PATTERN = Pattern.compile("mm2-offset-syncs\\..*\\.internal"); + + private static final Schema VALUE_SCHEMA; + private static final Schema KEY_SCHEMA; + + static { + VALUE_SCHEMA = new Schema( + new Field("upstreamOffset", Type.INT64), + new Field("offset", Type.INT64) + ); + KEY_SCHEMA = new Schema( + new Field("topic", Type.STRING), + new Field("partition", Type.INT32) + ); + } + + public OffsetSyncSerde() { + super(false); + } + + @Override + protected Schema getKeySchema() { + return KEY_SCHEMA; + } + + @Override + protected Optional getValueSchema() { + return Optional.of(VALUE_SCHEMA); + } +} diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java index d6f7a3699..91c2375d8 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java @@ -39,14 +39,10 @@ public class SchemaRegistrySerde implements BuiltInSerde { - + public static final String NAME = "SchemaRegistry"; private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0; private static final int SR_PAYLOAD_PREFIX_LENGTH = 5; - public static String name() { - return "SchemaRegistry"; - } - private static final String SCHEMA_REGISTRY = "schemaRegistry"; private SchemaRegistryClient schemaRegistryClient; @@ -174,11 +170,6 @@ private static SchemaRegistryClient createSchemaRegistryClient(List urls ); } - @Override - public Optional getDescription() { - return Optional.empty(); - } - @Override public boolean canDeserialize(String topic, Target type) { String subject = schemaSubject(topic, type); @@ -213,7 +204,7 @@ public Optional getSchema(String topic, Target type) { @SneakyThrows private String convertSchema(SchemaMetadata schema, ParsedSchema parsedSchema) { - URI basePath = new URI(schemaRegistryUrls.get(0)) + URI basePath = new URI(schemaRegistryUrls.getFirst()) .resolve(Integer.toString(schema.getId())); SchemaType schemaType = SchemaType.fromString(schema.getSchemaType()) .orElseThrow(() -> new IllegalStateException("Unknown schema type: " + schema.getSchemaType())); @@ -308,7 +299,7 @@ private int extractSchemaIdFromMsg(byte[] data) { throw new ValidationException( String.format( "Data doesn't contain magic byte and schema id prefix, so it can't be deserialized with %s serde", - name()) + NAME) ); } } diff --git a/api/src/test/java/io/kafbat/ui/emitter/CursorTest.java b/api/src/test/java/io/kafbat/ui/emitter/CursorTest.java index 5a262b132..095c373a7 100644 --- a/api/src/test/java/io/kafbat/ui/emitter/CursorTest.java +++ b/api/src/test/java/io/kafbat/ui/emitter/CursorTest.java @@ -181,11 +181,11 @@ private static ConsumerRecordDeserializer createRecordsDeserializer() { Serde s = new StringSerde(); s.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty()); return new ConsumerRecordDeserializer( - StringSerde.name(), + StringSerde.NAME, s.deserializer(null, Serde.Target.KEY), - StringSerde.name(), + StringSerde.NAME, s.deserializer(null, Serde.Target.VALUE), - StringSerde.name(), + StringSerde.NAME, s.deserializer(null, Serde.Target.KEY), s.deserializer(null, Serde.Target.VALUE), msg -> msg diff --git a/api/src/test/java/io/kafbat/ui/emitter/TailingEmitterTest.java b/api/src/test/java/io/kafbat/ui/emitter/TailingEmitterTest.java index c1130c9d2..f3c11ff62 100644 --- a/api/src/test/java/io/kafbat/ui/emitter/TailingEmitterTest.java +++ b/api/src/test/java/io/kafbat/ui/emitter/TailingEmitterTest.java @@ -114,8 +114,8 @@ private Flux createTailingFlux( query, null, 0, - StringSerde.name(), - StringSerde.name()); + StringSerde.NAME, + StringSerde.NAME); } private List startTailing(String filterQuery) { diff --git a/api/src/test/java/io/kafbat/ui/serdes/SerdesInitializerTest.java b/api/src/test/java/io/kafbat/ui/serdes/SerdesInitializerTest.java index 0d6fc36a3..8b559caa0 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/SerdesInitializerTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/SerdesInitializerTest.java @@ -31,8 +31,8 @@ class SerdesInitializerTest { Map.of( "BuiltIn1", BuiltInSerdeWithAutoconfigure.class, "BuiltIn2", BuiltInSerdeMock2NoAutoConfigure.class, - Int32Serde.name(), Int32Serde.class, - StringSerde.name(), StringSerde.class + Int32Serde.NAME, Int32Serde.class, + StringSerde.NAME, StringSerde.class ), customSerdeLoaderMock ); diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/mm2/CheckpointSerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/mm2/CheckpointSerdeTest.java new file mode 100644 index 000000000..51ddb9f67 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/mm2/CheckpointSerdeTest.java @@ -0,0 +1,65 @@ +package io.kafbat.ui.serdes.builtin.mm2; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.kafbat.ui.serde.api.DeserializeResult; +import io.kafbat.ui.serde.api.Serde; +import io.kafbat.ui.serdes.PropertyResolverImpl; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class CheckpointSerdeTest extends MirrorMakerSerdesAbstractTest { + + private static final CheckpointSerde SERDE = new CheckpointSerde(); + + @BeforeEach + void init() { + SERDE.configure( + PropertyResolverImpl.empty(), + PropertyResolverImpl.empty(), + PropertyResolverImpl.empty() + ); + } + + @Test + void testCanDeserialize() { + assertTrue(SERDE.canDeserialize(TOPIC, Serde.Target.KEY)); + assertTrue(SERDE.canDeserialize(TOPIC, Serde.Target.VALUE)); + } + + @Test + void testDeserializeKey() throws JsonProcessingException { + var key = decodeBase64("AAVncm91cAAFdG9waWMAAAAD"); + var expected = Map.of( + "partition", 3, + "topic", "topic", + "group", "group" + ); + + var result = SERDE.deserializer(TOPIC, Serde.Target.KEY).deserialize(HEADERS, key); + var resultMap = jsonToMap(result.getResult()); + + assertEquals(DeserializeResult.Type.JSON, result.getType()); + assertEquals(expected, resultMap); + } + + @Test + void testDeserializeValue() throws JsonProcessingException { + var value = decodeBase64("AAAAAAAAA/bkPgAAAAHz1jf7AAA="); + var expected = Map.of( + "offset", 8385869819L, + "upstreamOffset", 66511934, + "metadata", "" + ); + + var result = SERDE.deserializer(TOPIC, Serde.Target.VALUE).deserialize(HEADERS, value); + var resultMap = jsonToMap(result.getResult()); + + assertEquals(DeserializeResult.Type.JSON, result.getType()); + assertEquals(expected, resultMap); + } + +} diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/mm2/HeartbeatSerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/mm2/HeartbeatSerdeTest.java new file mode 100644 index 000000000..a3dacd149 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/mm2/HeartbeatSerdeTest.java @@ -0,0 +1,60 @@ +package io.kafbat.ui.serdes.builtin.mm2; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.kafbat.ui.serde.api.DeserializeResult; +import io.kafbat.ui.serde.api.Serde; +import io.kafbat.ui.serdes.PropertyResolverImpl; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class HeartbeatSerdeTest extends MirrorMakerSerdesAbstractTest { + + private static final HeartbeatSerde SERDE = new HeartbeatSerde(); + + @BeforeEach + void init() { + SERDE.configure( + PropertyResolverImpl.empty(), + PropertyResolverImpl.empty(), + PropertyResolverImpl.empty() + ); + } + + @Test + void testCanDeserialize() { + assertTrue(SERDE.canDeserialize(TOPIC, Serde.Target.KEY)); + assertTrue(SERDE.canDeserialize(TOPIC, Serde.Target.VALUE)); + } + + @Test + void testDeserializeKey() throws JsonProcessingException { + var key = decodeBase64("AAZzb3VyY2UABnRhcmdldA=="); + var expected = Map.of( + "sourceClusterAlias", "source", + "targetClusterAlias", "target" + ); + + var result = SERDE.deserializer(TOPIC, Serde.Target.KEY).deserialize(HEADERS, key); + var resultMap = jsonToMap(result.getResult()); + + assertEquals(DeserializeResult.Type.JSON, result.getType()); + assertEquals(expected, resultMap); + } + + @Test + void testDeserializeValue() throws JsonProcessingException { + var value = decodeBase64("AAAAAAGZgCEMZA=="); + var expected = Map.of("timestamp", 1758791273572L); + + var result = SERDE.deserializer(TOPIC, Serde.Target.VALUE).deserialize(HEADERS, value); + var resultMap = jsonToMap(result.getResult()); + + assertEquals(DeserializeResult.Type.JSON, result.getType()); + assertEquals(expected, resultMap); + } + +} diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/mm2/MirrorMakerSerdesAbstractTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/mm2/MirrorMakerSerdesAbstractTest.java new file mode 100644 index 000000000..91c6634a2 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/mm2/MirrorMakerSerdesAbstractTest.java @@ -0,0 +1,26 @@ +package io.kafbat.ui.serdes.builtin.mm2; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.kafbat.ui.serdes.RecordHeadersImpl; +import java.util.Base64; +import java.util.Map; + +public abstract class MirrorMakerSerdesAbstractTest { + + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + protected static final String TOPIC = "test-topic"; + protected static final RecordHeadersImpl HEADERS = new RecordHeadersImpl(); + + protected Map jsonToMap(String json) throws JsonProcessingException { + //@formatter:off + return OBJECT_MAPPER.readValue(json, new TypeReference<>() {}); + //@formatter:on + } + + protected static byte[] decodeBase64(String base64) { + return Base64.getDecoder().decode(base64.trim()); + } + +} diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/mm2/OffsetSyncSerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/mm2/OffsetSyncSerdeTest.java new file mode 100644 index 000000000..447b33e92 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/mm2/OffsetSyncSerdeTest.java @@ -0,0 +1,63 @@ +package io.kafbat.ui.serdes.builtin.mm2; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.kafbat.ui.serde.api.DeserializeResult; +import io.kafbat.ui.serde.api.Serde; +import io.kafbat.ui.serdes.PropertyResolverImpl; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class OffsetSyncSerdeTest extends MirrorMakerSerdesAbstractTest { + + private static final OffsetSyncSerde SERDE = new OffsetSyncSerde(); + + @BeforeEach + void init() { + SERDE.configure( + PropertyResolverImpl.empty(), + PropertyResolverImpl.empty(), + PropertyResolverImpl.empty() + ); + } + + @Test + void testCanDeserialize() { + assertTrue(SERDE.canDeserialize(TOPIC, Serde.Target.KEY)); + assertTrue(SERDE.canDeserialize(TOPIC, Serde.Target.VALUE)); + } + + @Test + void testDeserializeKey() throws JsonProcessingException { + var key = decodeBase64("AAl0b3BpY25hbWUAAAAA"); + var expected = Map.of( + "partition", 0, + "topic", "topicname" + ); + + var result = SERDE.deserializer(TOPIC, Serde.Target.KEY).deserialize(HEADERS, key); + var resultMap = jsonToMap(result.getResult()); + + assertEquals(DeserializeResult.Type.JSON, result.getType()); + assertEquals(expected, resultMap); + } + + @Test + void testDeserializeValue() throws JsonProcessingException { + var value = decodeBase64("AAAAAAACXsoAAAAAAAHMfw=="); + var expected = Map.of( + "offset", 117887, + "upstreamOffset", 155338 + ); + + var result = SERDE.deserializer(TOPIC, Serde.Target.VALUE).deserialize(HEADERS, value); + var resultMap = jsonToMap(result.getResult()); + + assertEquals(DeserializeResult.Type.JSON, result.getType()); + assertEquals(expected, resultMap); + } + +} diff --git a/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java b/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java index c0d02f39a..dc8f37711 100644 --- a/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java +++ b/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java @@ -97,8 +97,8 @@ void maskingAppliedOnConfiguredClusters() throws Exception { null, null, 100, - StringSerde.name(), - StringSerde.name() + StringSerde.NAME, + StringSerde.NAME ).filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE) .map(TopicMessageEventDTO::getMessage); @@ -128,7 +128,7 @@ void cursorIsRegisteredAfterPollingIsDoneAndCanBeUsedForNextPagePolling(PollingM Flux msgsFlux = messagesService.loadMessages( cluster, testTopic, new ConsumerPosition(mode, testTopic, List.of(), null, null), - null, null, pageSize, StringSerde.name(), StringSerde.name()) + null, null, pageSize, StringSerde.NAME, StringSerde.NAME) .doOnNext(evt -> { if (evt.getType() == TopicMessageEventDTO.TypeEnum.DONE) { assertThat(evt.getCursor()).isNotNull(); @@ -230,9 +230,9 @@ void sendMessageWithProtobufAnyType() { CreateTopicMessageDTO testMessage = new CreateTopicMessageDTO() .key(null) .partition(0) - .keySerde(StringSerde.name()) + .keySerde(StringSerde.NAME) .value(jsonContent) - .valueSerde(ProtobufFileSerde.name()); + .valueSerde(ProtobufFileSerde.NAME); String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID(); createTopicWithCleanup(new NewTopic(testTopic, 5, (short) 1)); diff --git a/api/src/test/java/io/kafbat/ui/service/RecordEmitterTest.java b/api/src/test/java/io/kafbat/ui/service/RecordEmitterTest.java index cc395ed41..b10e000fa 100644 --- a/api/src/test/java/io/kafbat/ui/service/RecordEmitterTest.java +++ b/api/src/test/java/io/kafbat/ui/service/RecordEmitterTest.java @@ -106,11 +106,11 @@ private static ConsumerRecordDeserializer createRecordsDeserializer() { Serde s = new StringSerde(); s.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty()); return new ConsumerRecordDeserializer( - StringSerde.name(), + StringSerde.NAME, s.deserializer(null, Serde.Target.KEY), - StringSerde.name(), + StringSerde.NAME, s.deserializer(null, Serde.Target.VALUE), - StringSerde.name(), + StringSerde.NAME, s.deserializer(null, Serde.Target.KEY), s.deserializer(null, Serde.Target.VALUE), msg -> msg diff --git a/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java b/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java index efed56ded..7e21112c1 100644 --- a/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java +++ b/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java @@ -142,9 +142,9 @@ void noSchemaStringKeyStringValue() { .withMsgToSend( new CreateTopicMessageDTO() .key("testKey") - .keySerde(StringSerde.name()) + .keySerde(StringSerde.NAME) .value("testValue") - .valueSerde(StringSerde.name()) + .valueSerde(StringSerde.NAME) ) .doAssert(polled -> { assertThat(polled.getKey()).isEqualTo("testKey"); @@ -158,9 +158,9 @@ void keyIsIntValueIsLong() { .withMsgToSend( new CreateTopicMessageDTO() .key("123") - .keySerde(Int32Serde.name()) + .keySerde(Int32Serde.NAME) .value("21474836470") - .valueSerde(Int64Serde.name()) + .valueSerde(Int64Serde.NAME) ) .doAssert(polled -> { assertThat(polled.getKey()).isEqualTo("123"); @@ -174,9 +174,9 @@ void keyIsNull() { .withMsgToSend( new CreateTopicMessageDTO() .key(null) - .keySerde(StringSerde.name()) + .keySerde(StringSerde.NAME) .value("testValue") - .valueSerde(StringSerde.name()) + .valueSerde(StringSerde.NAME) ) .doAssert(polled -> { assertThat(polled.getKey()).isNull(); @@ -190,9 +190,9 @@ void valueIsNull() { .withMsgToSend( new CreateTopicMessageDTO() .key("testKey") - .keySerde(StringSerde.name()) + .keySerde(StringSerde.NAME) .value(null) - .valueSerde(StringSerde.name()) + .valueSerde(StringSerde.NAME) ) .doAssert(polled -> { assertThat(polled.getKey()).isEqualTo("testKey"); @@ -208,9 +208,9 @@ void primitiveAvroSchemas() { .withMsgToSend( new CreateTopicMessageDTO() .key("\"some string\"") - .keySerde(SchemaRegistrySerde.name()) + .keySerde(SchemaRegistrySerde.NAME) .value("123") - .valueSerde(SchemaRegistrySerde.name()) + .valueSerde(SchemaRegistrySerde.NAME) ) .doAssert(polled -> { assertThat(polled.getKey()).isEqualTo("\"some string\""); @@ -226,9 +226,9 @@ void recordAvroSchema() { .withMsgToSend( new CreateTopicMessageDTO() .key(AVRO_SCHEMA_1_JSON_RECORD) - .keySerde(SchemaRegistrySerde.name()) + .keySerde(SchemaRegistrySerde.NAME) .value(AVRO_SCHEMA_2_JSON_RECORD) - .valueSerde(SchemaRegistrySerde.name()) + .valueSerde(SchemaRegistrySerde.NAME) ) .doAssert(polled -> { assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD); @@ -243,9 +243,9 @@ void keyWithNoSchemaValueWithProtoSchema() { .withMsgToSend( new CreateTopicMessageDTO() .key("testKey") - .keySerde(StringSerde.name()) + .keySerde(StringSerde.NAME) .value(PROTOBUF_SCHEMA_JSON_RECORD) - .valueSerde(SchemaRegistrySerde.name()) + .valueSerde(SchemaRegistrySerde.NAME) ) .doAssert(polled -> { assertThat(polled.getKey()).isEqualTo("testKey"); @@ -261,9 +261,9 @@ void keyWithAvroSchemaValueWithAvroSchemaKeyIsNull() { .withMsgToSend( new CreateTopicMessageDTO() .key(null) - .keySerde(SchemaRegistrySerde.name()) + .keySerde(SchemaRegistrySerde.NAME) .value(AVRO_SCHEMA_2_JSON_RECORD) - .valueSerde(SchemaRegistrySerde.name()) + .valueSerde(SchemaRegistrySerde.NAME) ) .doAssert(polled -> { @@ -278,10 +278,10 @@ void valueWithAvroSchemaShouldThrowExceptionIfArgIsNotValidJsonObject() { .withValueSchema(AVRO_SCHEMA_2) .withMsgToSend( new CreateTopicMessageDTO() - .keySerde(StringSerde.name()) + .keySerde(StringSerde.NAME) // f2 has type int instead of string .value("{ \"f1\": 111, \"f2\": 123 }") - .valueSerde(SchemaRegistrySerde.name()) + .valueSerde(SchemaRegistrySerde.NAME) ) .assertSendThrowsException(); } @@ -294,9 +294,9 @@ void keyWithAvroSchemaValueWithProtoSchema() { .withMsgToSend( new CreateTopicMessageDTO() .key(AVRO_SCHEMA_1_JSON_RECORD) - .keySerde(SchemaRegistrySerde.name()) + .keySerde(SchemaRegistrySerde.NAME) .value(PROTOBUF_SCHEMA_JSON_RECORD) - .valueSerde(SchemaRegistrySerde.name()) + .valueSerde(SchemaRegistrySerde.NAME) ) .doAssert(polled -> { assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD); @@ -311,10 +311,10 @@ void valueWithProtoSchemaShouldThrowExceptionArgIsNotValidJsonObject() { .withMsgToSend( new CreateTopicMessageDTO() .key(null) - .keySerde(StringSerde.name()) + .keySerde(StringSerde.NAME) // f2 field has type object instead of int .value("{ \"f1\" : \"test str\", \"f2\" : {} }") - .valueSerde(SchemaRegistrySerde.name()) + .valueSerde(SchemaRegistrySerde.NAME) ) .assertSendThrowsException(); } @@ -327,9 +327,9 @@ void keyWithProtoSchemaValueWithJsonSchema() { .withMsgToSend( new CreateTopicMessageDTO() .key(PROTOBUF_SCHEMA_JSON_RECORD) - .keySerde(SchemaRegistrySerde.name()) + .keySerde(SchemaRegistrySerde.NAME) .value(JSON_SCHEMA_RECORD) - .valueSerde(SchemaRegistrySerde.name()) + .valueSerde(SchemaRegistrySerde.NAME) ) .doAssert(polled -> { assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD); @@ -344,10 +344,10 @@ void valueWithJsonSchemaThrowsExceptionIfArgIsNotValidJsonObject() { .withMsgToSend( new CreateTopicMessageDTO() .key(null) - .keySerde(StringSerde.name()) + .keySerde(StringSerde.NAME) // 'f2' field has type object instead of string .value("{ \"f1\": 12, \"f2\": {}, \"schema\": \"some txt\" }") - .valueSerde(SchemaRegistrySerde.name()) + .valueSerde(SchemaRegistrySerde.NAME) ) .assertSendThrowsException(); } @@ -360,9 +360,9 @@ void topicMessageMetadataAvro() { .withMsgToSend( new CreateTopicMessageDTO() .key(AVRO_SCHEMA_1_JSON_RECORD) - .keySerde(SchemaRegistrySerde.name()) + .keySerde(SchemaRegistrySerde.NAME) .value(AVRO_SCHEMA_2_JSON_RECORD) - .valueSerde(SchemaRegistrySerde.name()) + .valueSerde(SchemaRegistrySerde.NAME) ) .doAssert(polled -> { assertJsonEqual(polled.getKey(), AVRO_SCHEMA_1_JSON_RECORD); @@ -385,9 +385,9 @@ void topicMessageMetadataProtobuf() { .withMsgToSend( new CreateTopicMessageDTO() .key(PROTOBUF_SCHEMA_JSON_RECORD) - .keySerde(SchemaRegistrySerde.name()) + .keySerde(SchemaRegistrySerde.NAME) .value(PROTOBUF_SCHEMA_JSON_RECORD) - .valueSerde(SchemaRegistrySerde.name()) + .valueSerde(SchemaRegistrySerde.NAME) ) .doAssert(polled -> { assertJsonEqual(polled.getKey(), PROTOBUF_SCHEMA_JSON_RECORD); @@ -409,9 +409,9 @@ void topicMessageMetadataJson() { .withMsgToSend( new CreateTopicMessageDTO() .key(JSON_SCHEMA_RECORD) - .keySerde(SchemaRegistrySerde.name()) + .keySerde(SchemaRegistrySerde.NAME) .value(JSON_SCHEMA_RECORD) - .valueSerde(SchemaRegistrySerde.name()) + .valueSerde(SchemaRegistrySerde.NAME) .headers(Map.of("header1", "value1")) ) .doAssert(polled -> { @@ -435,9 +435,9 @@ void headerValueNullPresentTest() { .withMsgToSend( new CreateTopicMessageDTO() .key(JSON_SCHEMA_RECORD) - .keySerde(SchemaRegistrySerde.name()) + .keySerde(SchemaRegistrySerde.NAME) .value(JSON_SCHEMA_RECORD) - .valueSerde(SchemaRegistrySerde.name()) + .valueSerde(SchemaRegistrySerde.NAME) .headers(Collections.singletonMap("header123", null)) ) .doAssert(polled -> assertThat(polled.getHeaders().get("header123")).isNull()); @@ -450,9 +450,9 @@ void noKeyAndNoContentPresentTest() { .withMsgToSend( new CreateTopicMessageDTO() .key(null) - .keySerde(StringSerde.name()) // any serde + .keySerde(StringSerde.NAME) // any serde .value(null) - .valueSerde(StringSerde.name()) // any serde + .valueSerde(StringSerde.NAME) // any serde ) .doAssert(polled -> { assertThat(polled.getKey()).isNull();