Skip to content

Commit 45d6fce

Browse files
author
Karthik Puttaswamy
committed
Bug fix to handle message indices in proto data
1 parent 8f70e83 commit 45d6fce

File tree

10 files changed

+140
-67
lines changed

10 files changed

+140
-67
lines changed

examples/powertools-examples-kafka/src/main/proto/KeyMessage.proto

Whitespace-only changes.

examples/powertools-examples-kafka/src/main/proto/ValueMessage.proto

Whitespace-only changes.

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializer.java

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@
4141
abstract class AbstractKafkaDeserializer implements PowertoolsDeserializer {
4242
protected static final ObjectMapper objectMapper = new ObjectMapper()
4343
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
44+
private static final Integer GLUE_SCHEMA_ID_LENGTH = 16;
45+
46+
public enum SchemaRegistryType {
47+
CONFLUENT,
48+
GLUE,
49+
NONE
50+
}
4451

4552
/**
4653
* Deserialize JSON from InputStream into ConsumerRecords
@@ -170,8 +177,8 @@ private <K, V> ConsumerRecord<K, V> convertToConsumerRecord(
170177
Class<K> keyType,
171178
Class<V> valueType) {
172179

173-
K key = deserializeField(eventRecord.getKey(), keyType, "key");
174-
V value = deserializeField(eventRecord.getValue(), valueType, "value");
180+
K key = deserializeField(eventRecord.getKey(), keyType, "key", extractSchemaRegistryType(eventRecord));
181+
V value = deserializeField(eventRecord.getValue(), valueType, "value", extractSchemaRegistryType(eventRecord));
175182
Headers headers = extractHeaders(eventRecord);
176183

177184
return new ConsumerRecord<>(
@@ -190,14 +197,14 @@ private <K, V> ConsumerRecord<K, V> convertToConsumerRecord(
190197
Optional.empty());
191198
}
192199

193-
private <T> T deserializeField(String encodedData, Class<T> type, String fieldName) {
200+
private <T> T deserializeField(String encodedData, Class<T> type, String fieldName, SchemaRegistryType schemaRegistryType) {
194201
if (encodedData == null) {
195202
return null;
196203
}
197204

198205
try {
199206
byte[] decodedBytes = Base64.getDecoder().decode(encodedData);
200-
return deserialize(decodedBytes, type);
207+
return deserialize(decodedBytes, type, schemaRegistryType);
201208
} catch (Exception e) {
202209
throw new RuntimeException("Failed to deserialize Kafka record " + fieldName + ".", e);
203210
}
@@ -218,36 +225,69 @@ private Headers extractHeaders(KafkaEvent.KafkaEventRecord eventRecord) {
218225
return headers;
219226
}
220227

228+
private String extractKeySchemaId(KafkaEvent.KafkaEventRecord eventRecord) {
229+
if (eventRecord.getKeySchemaMetadata() != null) {
230+
return eventRecord.getKeySchemaMetadata().getSchemaId();
231+
}
232+
return null;
233+
}
234+
235+
private String extractValueSchemaId(KafkaEvent.KafkaEventRecord eventRecord) {
236+
if (eventRecord.getValueSchemaMetadata() != null) {
237+
return eventRecord.getValueSchemaMetadata().getSchemaId();
238+
}
239+
return null;
240+
}
241+
242+
// The Assumption is that there will always be only one schema registry used, either Glue or Confluent, for both key
243+
// and value.
244+
protected SchemaRegistryType extractSchemaRegistryType(KafkaEvent.KafkaEventRecord eventRecord) {
245+
246+
String schemaId = extractValueSchemaId(eventRecord);
247+
if (schemaId == null) {
248+
schemaId = extractKeySchemaId(eventRecord);
249+
}
250+
251+
if (schemaId == null) {
252+
return SchemaRegistryType.NONE;
253+
}
254+
255+
return schemaId.length() == GLUE_SCHEMA_ID_LENGTH ? SchemaRegistryType.GLUE : SchemaRegistryType.CONFLUENT;
256+
}
257+
221258
/**
222259
* Template method to be implemented by subclasses for specific deserialization logic
223-
* for complex types (non-primitives).
224-
*
260+
* for complex types (non-primitives) and for specific Schema Registry type.
261+
*
225262
* @param <T> The type to deserialize to
226263
* @param data The byte array to deserialize coming from the base64 decoded Kafka field
227264
* @param type The class type to deserialize to
265+
* @param schemaRegistryType Schema Registry type
228266
* @return The deserialized object
229267
* @throws IOException If deserialization fails
230268
*/
231-
protected abstract <T> T deserializeObject(byte[] data, Class<T> type) throws IOException;
269+
protected abstract <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException;
232270

233271
/**
234-
* Main deserialize method that handles primitive types and delegates to subclasses for complex types.
235-
*
272+
* Main deserialize method that handles primitive types and delegates to subclasses for complex types and
273+
* for specific Schema Registry type.
274+
*
236275
* @param <T> The type to deserialize to
237276
* @param data The byte array to deserialize
238277
* @param type The class type to deserialize to
278+
* @param schemaRegistryType Schema Registry type
239279
* @return The deserialized object
240280
* @throws IOException If deserialization fails
241281
*/
242-
private <T> T deserialize(byte[] data, Class<T> type) throws IOException {
282+
private <T> T deserialize(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
243283
// First try to deserialize as a primitive type
244284
T result = deserializePrimitive(data, type);
245285
if (result != null) {
246286
return result;
247287
}
248288

249289
// Delegate to subclass for complex type deserialization
250-
return deserializeObject(data, type);
290+
return deserializeObject(data, type, schemaRegistryType);
251291
}
252292

253293
/**

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaAvroDeserializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
public class KafkaAvroDeserializer extends AbstractKafkaDeserializer {
2727

2828
@Override
29-
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
29+
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
3030
// If no Avro generated class is passed we cannot deserialize using Avro
3131
if (SpecificRecordBase.class.isAssignableFrom(type)) {
3232
try {

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaJsonDeserializer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414

1515
import java.io.IOException;
1616
import java.nio.charset.StandardCharsets;
17+
import java.rmi.UnexpectedException;
1718

1819
/**
1920
* Deserializer for Kafka records using JSON format.
2021
*/
2122
public class KafkaJsonDeserializer extends AbstractKafkaDeserializer {
2223

2324
@Override
24-
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
25+
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
2526
String decodedStr = new String(data, StandardCharsets.UTF_8);
2627

2728
return objectMapper.readValue(decodedStr, type);

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializer.java

Lines changed: 49 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
package software.amazon.lambda.powertools.kafka.serializers;
1414

1515
import java.io.IOException;
16+
import java.nio.ByteBuffer;
1617

18+
import org.apache.kafka.common.utils.ByteUtils;
1719
import org.slf4j.Logger;
1820
import org.slf4j.LoggerFactory;
1921

@@ -36,18 +38,16 @@ public class KafkaProtobufDeserializer extends AbstractKafkaDeserializer {
3638

3739
@Override
3840
@SuppressWarnings("unchecked")
39-
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
40-
// If no Protobuf generated class is passed we cannot deserialize using Protobuf
41+
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
42+
// If no Protobuf generated class is passed, we cannot deserialize using Protobuf
4143
if (Message.class.isAssignableFrom(type)) {
42-
try {
43-
// Get the parser from the generated Protobuf class
44-
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
45-
46-
// Try to deserialize the data, handling potential Confluent message indices
47-
Message message = deserializeWithMessageIndexHandling(data, parser);
48-
return type.cast(message);
49-
} catch (Exception e) {
50-
throw new IOException("Failed to deserialize Protobuf data.", e);
44+
switch (schemaRegistryType) {
45+
case GLUE:
46+
return glueDeserializer(data, type);
47+
case CONFLUENT:
48+
return confluentDeserializer(data, type);
49+
default:
50+
return defaultDeserializer(data, type);
5151
}
5252
} else {
5353
throw new IOException("Unsupported type for Protobuf deserialization: " + type.getName() + ". "
@@ -56,44 +56,55 @@ protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException
5656
}
5757
}
5858

59-
private Message deserializeWithMessageIndexHandling(byte[] data, Parser<Message> parser) throws IOException {
59+
private <T> T defaultDeserializer(byte[] data, Class<T> type) throws IOException {
6060
try {
61-
LOGGER.debug("Attempting to deserialize as standard protobuf data");
62-
return parser.parseFrom(data);
61+
LOGGER.info("Using default Protobuf deserializer");
62+
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
63+
Message message = parser.parseFrom(data);
64+
return type.cast(message);
6365
} catch (Exception e) {
64-
LOGGER.debug("Standard protobuf parsing failed, attempting Confluent message-index handling");
65-
return deserializeWithMessageIndex(data, parser);
66+
throw new IOException("Failed to deserialize Protobuf data.", e);
6667
}
6768
}
6869

69-
private Message deserializeWithMessageIndex(byte[] data, Parser<Message> parser) throws IOException {
70-
CodedInputStream codedInputStream = CodedInputStream.newInstance(data);
71-
70+
private <T> T confluentDeserializer(byte[] data, Class<T> type) throws IOException {
7271
try {
73-
// https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
74-
// Read the first varint - this could be:
75-
// 1. A single 0 (simple case - first message type)
76-
// 2. The length of the message index array (complex case)
77-
int firstValue = codedInputStream.readUInt32();
7872

79-
if (firstValue == 0) {
80-
// Simple case: Single 0 byte means first message type
81-
LOGGER.debug("Found simple message-index case (single 0), parsing remaining data as protobuf");
82-
return parser.parseFrom(codedInputStream);
83-
} else {
84-
// Complex case: firstValue is the length of the message index array
85-
LOGGER.debug("Found complex message-index case with array length: {}, skipping {} message index values",
86-
firstValue, firstValue);
87-
for (int i = 0; i < firstValue; i++) {
88-
codedInputStream.readUInt32(); // Skip each message index value
73+
LOGGER.info("Using confluentDeserializer");
74+
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
75+
ByteBuffer buffer = ByteBuffer.wrap(data);
76+
int size = ByteUtils.readVarint(buffer);
77+
78+
// Only if the size is greater than zero, continue reading varInt. based on Glue Proto deserializer implementation
79+
// Ref: https://tiny.amazon.com/1ru6rz8z/codeamazpackMaveblob1963thir
80+
if (size > 0) {
81+
for (int i = 0; i < size; i++) {
82+
ByteUtils.readVarint(buffer);
8983
}
90-
// Now the remaining data should be the actual protobuf message
91-
LOGGER.debug("Finished skipping message indexes, parsing remaining data as protobuf");
92-
return parser.parseFrom(codedInputStream);
9384
}
85+
Message message = parser.parseFrom(buffer);
86+
return type.cast(message);
87+
} catch (Exception e) {
88+
throw new IOException("Failed to deserialize Protobuf data.", e);
89+
}
90+
}
91+
92+
93+
private <T> T glueDeserializer(byte[] data, Class<T> type) throws IOException {
94+
try {
95+
96+
LOGGER.info("Using glueDeserializer");
97+
CodedInputStream codedInputStream = CodedInputStream.newInstance(data);
98+
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
99+
100+
// Seek one byte forward. Based on Glue Proto deserializer implementation
101+
// Ref: https://tiny.amazon.com/1c9cadl8g/codeamazpackMaveblobcf94thir
102+
codedInputStream.readUInt32();
94103

104+
Message message = parser.parseFrom(codedInputStream);
105+
return type.cast(message);
95106
} catch (Exception e) {
96-
throw new IOException("Failed to parse protobuf data with or without message index", e);
107+
throw new IOException("Failed to deserialize Protobuf data.", e);
97108
}
98109
}
99110
}

powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/AbstractKafkaDeserializerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ void shouldThrowExceptionWhenConvertingEmptyStringToChar(InputType inputType) {
462462
// Test implementation of AbstractKafkaDeserializer
463463
private static class TestDeserializer extends AbstractKafkaDeserializer {
464464
@Override
465-
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
465+
protected <T> T deserializeObject(byte[] data, Class<T> type, SchemaRegistryType schemaRegistryType) throws IOException {
466466
return objectMapper.readValue(data, type);
467467
}
468468
}

powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaAvroDeserializerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ void shouldThrowExceptionWhenTypeIsNotAvroSpecificRecord() {
3838
byte[] data = new byte[] { 1, 2, 3 };
3939

4040
// When/Then
41-
assertThatThrownBy(() -> deserializer.deserializeObject(data, String.class))
41+
assertThatThrownBy(() -> deserializer.deserializeObject(data, String.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE))
4242
.isInstanceOf(IOException.class)
4343
.hasMessageContaining("Unsupported type for Avro deserialization");
4444
}
@@ -50,7 +50,7 @@ void shouldDeserializeValidAvroData() throws IOException {
5050
byte[] avroData = serializeAvro(product);
5151

5252
// When
53-
TestProduct result = deserializer.deserializeObject(avroData, TestProduct.class);
53+
TestProduct result = deserializer.deserializeObject(avroData, TestProduct.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE);
5454

5555
// Then
5656
assertThat(result).isNotNull();
@@ -65,7 +65,7 @@ void shouldThrowExceptionWhenDeserializingInvalidAvroData() {
6565
byte[] invalidAvroData = new byte[] { 1, 2, 3, 4, 5 };
6666

6767
// When/Then
68-
assertThatThrownBy(() -> deserializer.deserializeObject(invalidAvroData, TestProduct.class))
68+
assertThatThrownBy(() -> deserializer.deserializeObject(invalidAvroData, TestProduct.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE))
6969
.isInstanceOf(IOException.class)
7070
.hasMessageContaining("Failed to deserialize Avro data");
7171
}

powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaJsonDeserializerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ void shouldThrowExceptionWhenTypeIsNotSupportedForJson() {
4242
byte[] data = new byte[] { 1, 2, 3 };
4343

4444
// When/Then
45-
assertThatThrownBy(() -> deserializer.deserializeObject(data, Object.class))
45+
assertThatThrownBy(() -> deserializer.deserializeObject(data, Object.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE))
4646
.isInstanceOf(JsonParseException.class);
4747
}
4848

@@ -53,7 +53,7 @@ void shouldDeserializeValidJsonData() throws IOException {
5353
byte[] jsonData = objectMapper.writeValueAsBytes(product);
5454

5555
// When
56-
TestProductPojo result = deserializer.deserializeObject(jsonData, TestProductPojo.class);
56+
TestProductPojo result = deserializer.deserializeObject(jsonData, TestProductPojo.class, AbstractKafkaDeserializer.SchemaRegistryType.NONE);
5757

5858
// Then
5959
assertThat(result).isNotNull();

0 commit comments

Comments
 (0)