Skip to content

Commit ba4e174

Browse files
author
Ildar Almakaev
authored
ISSUE-166 Pass ProtobufSchemaProvider to CachedSchemaRegistryClient to deserialize protobuf records (#178)
1 parent 40d8564 commit ba4e174

File tree

1 file changed

+13
-8
lines changed

1 file changed

+13
-8
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SchemaRegistryRecordDeserializer.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
import com.fasterxml.jackson.databind.ObjectMapper;
55
import com.google.protobuf.Message;
66
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
7+
import io.confluent.kafka.schemaregistry.SchemaProvider;
78
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
89
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
910
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
1011
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
1112
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
1213
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
14+
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
1315
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
1416
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
1517
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
@@ -41,14 +43,17 @@ public SchemaRegistryRecordDeserializer(KafkaCluster cluster, ObjectMapper objec
4143
this.cluster = cluster;
4244
this.objectMapper = objectMapper;
4345

44-
this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry()).map(e ->
45-
new CachedSchemaRegistryClient(
46-
Collections.singletonList(e),
47-
CLIENT_IDENTITY_MAP_CAPACITY,
48-
Collections.singletonList(new AvroSchemaProvider()),
49-
Collections.emptyMap()
50-
)
51-
).orElse(null);
46+
this.schemaRegistryClient = Optional.ofNullable(cluster.getSchemaRegistry())
47+
.map(schemaRegistryUrl -> {
48+
List<SchemaProvider> schemaProviders = List.of(new AvroSchemaProvider(), new ProtobufSchemaProvider());
49+
return new CachedSchemaRegistryClient(
50+
Collections.singletonList(schemaRegistryUrl),
51+
CLIENT_IDENTITY_MAP_CAPACITY,
52+
schemaProviders,
53+
Collections.emptyMap()
54+
);
55+
}
56+
).orElse(null);
5257

5358
this.avroDeserializer = Optional.ofNullable(this.schemaRegistryClient)
5459
.map(KafkaAvroDeserializer::new)

0 commit comments

Comments
 (0)