Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.protobuf.StructProto;
import com.google.protobuf.TimestampProto;
import com.google.protobuf.TypeProto;
import com.google.protobuf.TypeRegistry;
import com.google.protobuf.WrappersProto;
import com.google.protobuf.util.JsonFormat;
import com.google.type.ColorProto;
Expand Down Expand Up @@ -147,12 +148,18 @@ public boolean canSerialize(String topic, Serde.Target type) {
@Override
public Serde.Serializer serializer(String topic, Serde.Target type) {
var descriptor = descriptorFor(topic, type).orElseThrow();
TypeRegistry typeRegistry = TypeRegistry.newBuilder()
.add(descriptorPaths.keySet())
.build();

return new Serde.Serializer() {
@SneakyThrows
@Override
public byte[] serialize(String input) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
JsonFormat.parser().merge(input, builder);
JsonFormat.parser()
.usingTypeRegistry(typeRegistry)
.merge(input, builder);
return builder.build().toByteArray();
}
};
Expand Down
14 changes: 14 additions & 0 deletions api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kafbat.ui.container.KafkaConnectContainer;
import io.kafbat.ui.container.KsqlDbContainer;
import io.kafbat.ui.container.SchemaRegistryContainer;
import java.io.FileNotFoundException;
import java.nio.file.Path;
import java.util.List;
import java.util.Properties;
Expand All @@ -22,6 +23,7 @@
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.util.TestSocketUtils;
import org.springframework.util.ResourceUtils;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
Expand Down Expand Up @@ -75,6 +77,18 @@ public static class Initializer
public void initialize(@NotNull ConfigurableApplicationContext context) {
System.setProperty("kafka.clusters.0.name", LOCAL);
System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers());

// Add ProtobufFileSerde configuration
System.setProperty("kafka.clusters.0.serde.0.name", "ProtobufFile");
System.setProperty("kafka.clusters.0.serde.0.topicValuesPattern", "masking-test-.*");
try {
System.setProperty("kafka.clusters.0.serde.0.properties.protobufFilesDir",
ResourceUtils.getFile("classpath:protobuf-serde").getAbsolutePath());
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
System.setProperty("kafka.clusters.0.serde.0.properties.protobufMessageName", "test.MessageWithAny");

// List unavailable hosts to verify failover
System.setProperty("kafka.clusters.0.schemaRegistry",
String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,15 @@ void setUp() throws Exception {
void loadsAllProtoFiledFromTargetDirectory() throws Exception {
var protoDir = ResourceUtils.getFile("classpath:protobuf-serde/").getPath();
List<ProtoFile> files = new ProtobufFileSerde.ProtoSchemaLoader(protoDir).load();
assertThat(files).hasSize(4);
assertThat(files).hasSize(5);
assertThat(files)
.map(f -> f.getLocation().getPath())
.containsExactlyInAnyOrder(
"language/language.proto",
"sensor.proto",
"address-book.proto",
"lang-description.proto"
"lang-description.proto",
"messagewithany.proto"
);
}

Expand Down
30 changes: 30 additions & 0 deletions api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kafbat.ui.model.TopicMessageDTO;
import io.kafbat.ui.model.TopicMessageEventDTO;
import io.kafbat.ui.producer.KafkaTestProducer;
import io.kafbat.ui.serdes.builtin.ProtobufFileSerde;
import io.kafbat.ui.serdes.builtin.StringSerde;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -214,4 +215,33 @@ void execSmartFilterTestReturnsErrorOnFilterCompilationError() {
assertThat(result.getError()).containsIgnoringCase("Compilation error");
}

@Test
void sendMessageWithProtobufAnyType() {
String jsonContent = """
{
"name": "testName",
"payload": {
"@type": "type.googleapis.com/test.PayloadMessage",
"id": "123"
}
}
""";

CreateTopicMessageDTO testMessage = new CreateTopicMessageDTO()
.key(null)
.partition(0)
.keySerde(StringSerde.name())
.content(jsonContent)
.valueSerde(ProtobufFileSerde.name());

String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID();
createTopicWithCleanup(new NewTopic(testTopic, 5, (short) 1));

StepVerifier.create(messagesService.sendMessage(cluster, testTopic, testMessage))
.expectNextMatches(metadata -> metadata.topic().equals(testTopic)
&& metadata.partition() == 0
&& metadata.offset() >= 0)
.verifyComplete();
}

}
13 changes: 13 additions & 0 deletions api/src/test/resources/protobuf-serde/messagewithany.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";
package test;

import "google/protobuf/any.proto";

message MessageWithAny {
string name = 1;
google.protobuf.Any payload = 2;
}

message PayloadMessage {
string id = 1;
}
Loading