Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
2 changes: 1 addition & 1 deletion examples/powertools-examples-serialization/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>3.15.0</version>
<version>3.16.0</version>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
<payloadoffloading-common.version>2.2.0</payloadoffloading-common.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<lambda.core.version>1.2.3</lambda.core.version>
<lambda.events.version>3.15.0</lambda.events.version>
<lambda.events.version>3.16.0</lambda.events.version>
<lambda.serial.version>1.1.5</lambda.serial.version>
<maven-compiler-plugin.version>3.13.0</maven-compiler-plugin.version>
<aspectj.version>1.9.7</aspectj.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@
abstract class AbstractKafkaDeserializer implements PowertoolsDeserializer {
protected static final ObjectMapper objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static final Integer GLUE_SCHEMA_ID_LENGTH = 16;

public enum SchemaRegistryType {
CONFLUENT,
GLUE,
NONE
}

/**
* Deserialize JSON from InputStream into ConsumerRecords
Expand Down Expand Up @@ -170,8 +177,8 @@ private <K, V> ConsumerRecord<K, V> convertToConsumerRecord(
Class<K> keyType,
Class<V> valueType) {

K key = deserializeField(eventRecord.getKey(), keyType, "key");
V value = deserializeField(eventRecord.getValue(), valueType, "value");
K key = deserializeField(eventRecord.getKey(), keyType, "key", extractSchemaRegistryType(eventRecord));
V value = deserializeField(eventRecord.getValue(), valueType, "value", extractSchemaRegistryType(eventRecord));
Headers headers = extractHeaders(eventRecord);

return new ConsumerRecord<>(
Expand All @@ -190,14 +197,14 @@ private <K, V> ConsumerRecord<K, V> convertToConsumerRecord(
Optional.empty());
}

private <T> T deserializeField(String encodedData, Class<T> type, String fieldName) {
private <T> T deserializeField(String encodedData, Class<T> type, String fieldName, SchemaRegistryType schemaRegistryType) {
if (encodedData == null) {
return null;
}

try {
byte[] decodedBytes = Base64.getDecoder().decode(encodedData);
return deserialize(decodedBytes, type);
return deserialize(decodedBytes, type, schemaRegistryType);
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize Kafka record " + fieldName + ".", e);
}
Expand All @@ -218,36 +225,69 @@ private Headers extractHeaders(KafkaEvent.KafkaEventRecord eventRecord) {
return headers;
}

private String extractKeySchemaId(KafkaEvent.KafkaEventRecord eventRecord) {
if (eventRecord.getKeySchemaMetadata() != null) {
return eventRecord.getKeySchemaMetadata().getSchemaId();
}
return null;
}

private String extractValueSchemaId(KafkaEvent.KafkaEventRecord eventRecord) {
if (eventRecord.getValueSchemaMetadata() != null) {
return eventRecord.getValueSchemaMetadata().getSchemaId();
}
return null;
}

// The Assumption is that there will always be only one schema registry used, either Glue or Confluent, for both key
// and value.
protected SchemaRegistryType extractSchemaRegistryType(KafkaEvent.KafkaEventRecord eventRecord) {

String schemaId = extractValueSchemaId(eventRecord);
if (schemaId == null) {
schemaId = extractKeySchemaId(eventRecord);
}

if (schemaId == null) {
return SchemaRegistryType.NONE;
}

return schemaId.length() == GLUE_SCHEMA_ID_LENGTH ? SchemaRegistryType.GLUE : SchemaRegistryType.CONFLUENT;
}

/**
* Template method to be implemented by subclasses for specific deserialization logic
* for complex types (non-primitives).
*
* for complex types (non-primitives) and for specific Schema Registry type.
*
* @param <T> The type to deserialize to
* @param data The byte array to deserialize coming from the base64 decoded Kafka field
* @param type The class type to deserialize to
* @param schemaRegistryType Schema Registry type
* @return The deserialized object
* @throws IOException If deserialization fails
*/
protected abstract <T> T deserializeObject(byte[] data, Class<T> type) throws IOException;
protected abstract <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException;

/**
* Main deserialize method that handles primitive types and delegates to subclasses for complex types.
*
* Main deserialize method that handles primitive types and delegates to subclasses for complex types and
* for specific Schema Registry type.
*
* @param <T> The type to deserialize to
* @param data The byte array to deserialize
* @param type The class type to deserialize to
* @param schemaRegistryType Schema Registry type
* @return The deserialized object
* @throws IOException If deserialization fails
*/
private <T> T deserialize(byte[] data, Class<T> type) throws IOException {
private <T> T deserialize(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
// First try to deserialize as a primitive type
T result = deserializePrimitive(data, type);
if (result != null) {
return result;
}

// Delegate to subclass for complex type deserialization
return deserializeObject(data, type);
return deserializeObject(data, type, schemaRegistryType);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class KafkaAvroDeserializer extends AbstractKafkaDeserializer {

@Override
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
// If no Avro generated class is passed we cannot deserialize using Avro
if (SpecificRecordBase.class.isAssignableFrom(type)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.rmi.UnexpectedException;

/**
* Deserializer for Kafka records using JSON format.
*/
public class KafkaJsonDeserializer extends AbstractKafkaDeserializer {

@Override
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
String decodedStr = new String(data, StandardCharsets.UTF_8);

return objectMapper.readValue(decodedStr, type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
package software.amazon.lambda.powertools.kafka.serializers;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.kafka.common.utils.ByteUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,18 +38,16 @@ public class KafkaProtobufDeserializer extends AbstractKafkaDeserializer {

@Override
@SuppressWarnings("unchecked")
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
// If no Protobuf generated class is passed we cannot deserialize using Protobuf
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
// If no Protobuf generated class is passed, we cannot deserialize using Protobuf
if (Message.class.isAssignableFrom(type)) {
try {
// Get the parser from the generated Protobuf class
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);

// Try to deserialize the data, handling potential Confluent message indices
Message message = deserializeWithMessageIndexHandling(data, parser);
return type.cast(message);
} catch (Exception e) {
throw new IOException("Failed to deserialize Protobuf data.", e);
switch (schemaRegistryType) {
case GLUE:
return glueDeserializer(data, type);
case CONFLUENT:
return confluentDeserializer(data, type);
default:
return defaultDeserializer(data, type);
}
} else {
throw new IOException("Unsupported type for Protobuf deserialization: " + type.getName() + ". "
Expand All @@ -56,44 +56,53 @@ protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException
}
}

private Message deserializeWithMessageIndexHandling(byte[] data, Parser<Message> parser) throws IOException {
private <T> T defaultDeserializer(byte[] data, Class<T> type) throws IOException {
try {
LOGGER.debug("Attempting to deserialize as standard protobuf data");
return parser.parseFrom(data);
LOGGER.debug("Using default Protobuf deserializer");
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
Message message = parser.parseFrom(data);
return type.cast(message);
} catch (Exception e) {
LOGGER.debug("Standard protobuf parsing failed, attempting Confluent message-index handling");
return deserializeWithMessageIndex(data, parser);
throw new IOException("Failed to deserialize Protobuf data.", e);
}
}

private Message deserializeWithMessageIndex(byte[] data, Parser<Message> parser) throws IOException {
CodedInputStream codedInputStream = CodedInputStream.newInstance(data);

private <T> T confluentDeserializer(byte[] data, Class<T> type) throws IOException {
try {
// https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
// Read the first varint - this could be:
// 1. A single 0 (simple case - first message type)
// 2. The length of the message index array (complex case)
int firstValue = codedInputStream.readUInt32();

if (firstValue == 0) {
// Simple case: Single 0 byte means first message type
LOGGER.debug("Found simple message-index case (single 0), parsing remaining data as protobuf");
return parser.parseFrom(codedInputStream);
} else {
// Complex case: firstValue is the length of the message index array
LOGGER.debug("Found complex message-index case with array length: {}, skipping {} message index values",
firstValue, firstValue);
for (int i = 0; i < firstValue; i++) {
codedInputStream.readUInt32(); // Skip each message index value
LOGGER.debug("Using Confluent Deserializer");
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
ByteBuffer buffer = ByteBuffer.wrap(data);
int size = ByteUtils.readVarint(buffer);

// Only if the size is greater than zero, continue reading varInt. based on Glue Proto deserializer implementation
if (size > 0) {
for (int i = 0; i < size; i++) {
ByteUtils.readVarint(buffer);
}
// Now the remaining data should be the actual protobuf message
LOGGER.debug("Finished skipping message indexes, parsing remaining data as protobuf");
return parser.parseFrom(codedInputStream);
}
Message message = parser.parseFrom(buffer);
return type.cast(message);
} catch (Exception e) {
throw new IOException("Failed to deserialize Protobuf data.", e);
}
}


private <T> T glueDeserializer(byte[] data, Class<T> type) throws IOException {
try {

LOGGER.debug("Using Glue Deserializer");
CodedInputStream codedInputStream = CodedInputStream.newInstance(data);
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);

// Seek one byte forward. Based on Glue Proto deserializer implementation
codedInputStream.readUInt32();

Message message = parser.parseFrom(codedInputStream);
return type.cast(message);
} catch (Exception e) {
throw new IOException("Failed to parse protobuf data with or without message index", e);
throw new IOException("Failed to deserialize Protobuf data.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,9 @@
}

// Test implementation of AbstractKafkaDeserializer
private static class TestDeserializer extends AbstractKafkaDeserializer {

Check failure on line 463 in powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializerTest.java

View workflow job for this annotation

GitHub Actions / pmd_analyse

This class has only private constructors and may be final

Reports classes that may be made final because they cannot be extended from outside their compilation unit anyway. This is because all their constructors are private, so a subclass could not call the super constructor. ClassWithOnlyPrivateConstructorsShouldBeFinal (Priority: 1, Ruleset: Design) https://docs.pmd-code.org/pmd-doc-7.14.0/pmd_rules_java_design.html#classwithonlyprivateconstructorsshouldbefinal
@Override
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
return objectMapper.readValue(data, type);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void shouldThrowExceptionWhenTypeIsNotAvroSpecificRecord() {
byte[] data = new byte[] { 1, 2, 3 };

// When/Then
assertThatThrownBy(() -> deserializer.deserializeObject(data, String.class))
assertThatThrownBy(() -> deserializer.deserializeObject(data, String.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE))
.isInstanceOf(IOException.class)
.hasMessageContaining("Unsupported type for Avro deserialization");
}
Expand All @@ -50,7 +50,7 @@ void shouldDeserializeValidAvroData() throws IOException {
byte[] avroData = serializeAvro(product);

// When
TestProduct result = deserializer.deserializeObject(avroData, TestProduct.class);
TestProduct result = deserializer.deserializeObject(avroData, TestProduct.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE);

// Then
assertThat(result).isNotNull();
Expand All @@ -65,7 +65,7 @@ void shouldThrowExceptionWhenDeserializingInvalidAvroData() {
byte[] invalidAvroData = new byte[] { 1, 2, 3, 4, 5 };

// When/Then
assertThatThrownBy(() -> deserializer.deserializeObject(invalidAvroData, TestProduct.class))
assertThatThrownBy(() -> deserializer.deserializeObject(invalidAvroData, TestProduct.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE))
.isInstanceOf(IOException.class)
.hasMessageContaining("Failed to deserialize Avro data");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void shouldThrowExceptionWhenTypeIsNotSupportedForJson() {
byte[] data = new byte[] { 1, 2, 3 };

// When/Then
assertThatThrownBy(() -> deserializer.deserializeObject(data, Object.class))
assertThatThrownBy(() -> deserializer.deserializeObject(data, Object.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE))
.isInstanceOf(JsonParseException.class);
}

Expand All @@ -53,7 +53,7 @@ void shouldDeserializeValidJsonData() throws IOException {
byte[] jsonData = objectMapper.writeValueAsBytes(product);

// When
TestProductPojo result = deserializer.deserializeObject(jsonData, TestProductPojo.class);
TestProductPojo result = deserializer.deserializeObject(jsonData, TestProductPojo.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE);

// Then
assertThat(result).isNotNull();
Expand Down
Loading
Loading