Skip to content

Commit 1f6153b

Browse files
committed
issues/650: ProtobufFile serde with proto files containing Any fix
1 parent e7df880 commit 1f6153b

File tree

4 files changed

+71
-1
lines changed

4 files changed

+71
-1
lines changed

api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.google.protobuf.StructProto;
1616
import com.google.protobuf.TimestampProto;
1717
import com.google.protobuf.TypeProto;
18+
import com.google.protobuf.TypeRegistry;
1819
import com.google.protobuf.WrappersProto;
1920
import com.google.protobuf.util.JsonFormat;
2021
import com.google.type.ColorProto;
@@ -147,12 +148,18 @@ public boolean canSerialize(String topic, Serde.Target type) {
147148
@Override
148149
public Serde.Serializer serializer(String topic, Serde.Target type) {
149150
var descriptor = descriptorFor(topic, type).orElseThrow();
151+
TypeRegistry typeRegistry = TypeRegistry.newBuilder()
152+
.add(descriptorPaths.keySet())
153+
.build();
154+
150155
return new Serde.Serializer() {
151156
@SneakyThrows
152157
@Override
153158
public byte[] serialize(String input) {
154159
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
155-
JsonFormat.parser().merge(input, builder);
160+
JsonFormat.parser()
161+
.usingTypeRegistry(typeRegistry)
162+
.merge(input, builder);
156163
return builder.build().toByteArray();
157164
}
158165
};

api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.kafbat.ui.container.KafkaConnectContainer;
66
import io.kafbat.ui.container.KsqlDbContainer;
77
import io.kafbat.ui.container.SchemaRegistryContainer;
8+
import java.io.FileNotFoundException;
89
import java.nio.file.Path;
910
import java.util.List;
1011
import java.util.Properties;
@@ -22,6 +23,7 @@
2223
import org.springframework.test.context.ActiveProfiles;
2324
import org.springframework.test.context.ContextConfiguration;
2425
import org.springframework.test.util.TestSocketUtils;
26+
import org.springframework.util.ResourceUtils;
2527
import org.testcontainers.containers.KafkaContainer;
2628
import org.testcontainers.containers.Network;
2729
import org.testcontainers.utility.DockerImageName;
@@ -75,6 +77,18 @@ public static class Initializer
7577
public void initialize(@NotNull ConfigurableApplicationContext context) {
7678
System.setProperty("kafka.clusters.0.name", LOCAL);
7779
System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers());
80+
81+
// Add ProtobufFileSerde configuration
82+
System.setProperty("kafka.clusters.0.serde.0.name", "ProtobufFile");
83+
System.setProperty("kafka.clusters.0.serde.0.topicValuesPattern", "masking-test-.*");
84+
try {
85+
System.setProperty("kafka.clusters.0.serde.0.properties.protobufFilesDir",
86+
ResourceUtils.getFile("classpath:protobuf-serde").getAbsolutePath());
87+
} catch (FileNotFoundException e) {
88+
throw new RuntimeException(e);
89+
}
90+
System.setProperty("kafka.clusters.0.serde.0.properties.protobufMessageName", "test.Main");
91+
7892
// List unavailable hosts to verify failover
7993
System.setProperty("kafka.clusters.0.schemaRegistry",
8094
String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s",

api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
import io.kafbat.ui.model.TopicMessageDTO;
1515
import io.kafbat.ui.model.TopicMessageEventDTO;
1616
import io.kafbat.ui.producer.KafkaTestProducer;
17+
import io.kafbat.ui.serdes.builtin.Int32Serde;
18+
import io.kafbat.ui.serdes.builtin.Int64Serde;
19+
import io.kafbat.ui.serdes.builtin.ProtobufFileSerde;
20+
import io.kafbat.ui.serdes.builtin.ProtobufRawSerde;
1721
import io.kafbat.ui.serdes.builtin.StringSerde;
1822
import java.util.HashSet;
1923
import java.util.List;
@@ -22,13 +26,16 @@
2226
import java.util.UUID;
2327
import java.util.concurrent.atomic.AtomicReference;
2428
import org.apache.kafka.clients.admin.NewTopic;
29+
import org.apache.kafka.clients.producer.RecordMetadata;
2530
import org.junit.jupiter.api.AfterEach;
2631
import org.junit.jupiter.api.BeforeEach;
2732
import org.junit.jupiter.api.Test;
2833
import org.junit.jupiter.params.ParameterizedTest;
2934
import org.junit.jupiter.params.provider.CsvSource;
35+
import org.openapitools.jackson.nullable.JsonNullable;
3036
import org.springframework.beans.factory.annotation.Autowired;
3137
import reactor.core.publisher.Flux;
38+
import reactor.core.publisher.Mono;
3239
import reactor.test.StepVerifier;
3340

3441
class MessagesServiceTest extends AbstractIntegrationTest {
@@ -215,4 +222,33 @@ void execSmartFilterTestReturnsErrorOnFilterCompilationError() {
215222
assertThat(result.getError()).containsIgnoringCase("Compilation error");
216223
}
217224

225+
@Test
226+
void sendMessageWithProtobufAnyType() {
227+
String jsonContent = """
228+
{
229+
"name": "testFromSpringApp",
230+
"payload": {
231+
"@type": "type.googleapis.com/test.Referenced",
232+
"id": "123"
233+
}
234+
}
235+
""";
236+
237+
CreateTopicMessageDTO testMessage = new CreateTopicMessageDTO()
238+
.key(null)
239+
.partition(0)
240+
.keySerde(StringSerde.name())
241+
.content(jsonContent)
242+
.valueSerde(ProtobufFileSerde.name());
243+
244+
String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID();
245+
createTopicWithCleanup(new NewTopic(testTopic, 5, (short) 1));
246+
247+
StepVerifier.create(messagesService.sendMessage(cluster, testTopic, testMessage))
248+
.expectNextMatches(metadata -> metadata.topic().equals(testTopic) &&
249+
metadata.partition() == 0 &&
250+
metadata.offset() >= 0)
251+
.verifyComplete();
252+
}
253+
218254
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
syntax = "proto3";
2+
package test;
3+
4+
import "google/protobuf/any.proto";
5+
6+
message Main {
7+
string name = 1;
8+
google.protobuf.Any payload = 2;
9+
}
10+
11+
message Referenced {
12+
string id = 1;
13+
}

0 commit comments

Comments
 (0)