diff --git a/src/main/java/kafdrop/util/AvroMessageDeserializer.java b/src/main/java/kafdrop/util/AvroMessageDeserializer.java index 41bc08d0..6618c046 100644 --- a/src/main/java/kafdrop/util/AvroMessageDeserializer.java +++ b/src/main/java/kafdrop/util/AvroMessageDeserializer.java @@ -11,15 +11,28 @@ public final class AvroMessageDeserializer implements MessageDeserializer { private final String topicName; private final KafkaAvroDeserializer deserializer; + private final AWSKafkaAvroDeserializer glueSchemaRegistryKafkaDeserializer; + + public AvroMessageDeserializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) { this.topicName = topicName; - this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth); + if("awsGlue") + { + this.glueSchemaRegistryKafkaDeserializer=getAWSDeserializer(schemaName, schemaRegistryName, awsRegion); + + }else{ + this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth); + } + } @Override public String deserializeMessage(ByteBuffer buffer) { // Convert byte buffer to byte array final var bytes = ByteUtils.convertToByteArray(buffer); + if("awsGlue") + return glueSchemaRegistryKafkaDeserializer.deserialize(topicName, bytes).toString(); + else return deserializer.deserialize(topicName, bytes).toString(); } @@ -34,4 +47,20 @@ private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, S kafkaAvroDeserializer.configure(config, false); return kafkaAvroDeserializer; } + + private static AWSKafkaAvroDeserializer getAWSDeserializer(String schemaName, String schemaRegistryName, String awsRegion) { + + final var props = new HashMap(); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName()); + + props.put(AWSSchemaRegistryConstants.AWS_REGION, awsRegion); + props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, schemaRegistryName); + props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, schemaName); + props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); + + final var deserializer = new AWSKafkaAvroDeserializer(); + deserializer.configure(props, false); + return deserializer; + } }