Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"timestamp": 1545084650988,
"timestampType": "CREATE_TIME",
"key": "NDI=",
"value": "COoHEgpTbWFydHBob25lGVK4HoXrv4JA",
"value": "AAjpBxIGTGFwdG9wGVK4HoXrP49A",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
Expand All @@ -39,7 +39,7 @@
"timestamp": 1545084650989,
"timestampType": "CREATE_TIME",
"key": null,
"value": "COsHEgpIZWFkcGhvbmVzGUjhehSuv2JA",
"value": "AgEACOkHEgZMYXB0b3AZUrgehes/j0A=",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
Expand Down
9 changes: 8 additions & 1 deletion examples/powertools-examples-kafka/tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ The tool will output base64-encoded values for Avro products that can be used in
mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateProtobufSamples"
```

The tool will output base64-encoded values for Protobuf products that can be used in `../events/kafka-protobuf-event.json`.
The tool will output base64-encoded values for Protobuf products that can be used in `../events/kafka-protobuf-event.json`. This generator creates samples with and without Confluent message-indexes to test different serialization scenarios.

## Output

Expand All @@ -55,6 +55,13 @@ Each generator produces:
2. An integer key (42) and one entry with a nullish key to test for edge-cases
3. A complete sample event structure that can be used directly for testing

The Protobuf generators additionally create samples with different Confluent message-index formats:
- Standard protobuf (no message indexes)
- Simple message index (single 0 byte)
- Complex message index (length-prefixed array)

For more information about Confluent Schema Registry serialization formats and wire format specifications, see the [Confluent documentation](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format).

## Example

After generating the samples, you can copy the output into the respective event files:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,73 +1,106 @@
package org.demo.kafka.tools;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Base64;

import org.demo.kafka.protobuf.ProtobufProduct;

import com.google.protobuf.CodedOutputStream;

/**
* Utility class to generate base64-encoded Protobuf serialized products
* for use in test events.
*/
public class GenerateProtobufSamples {

Check failure on line 15 in examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java

View workflow job for this annotation

GitHub Actions / pmd_analyse

This utility class has a non-private constructor

For classes that only have static methods, consider making them utility classes. Note that this doesn't apply to abstract classes, since their subclasses may well include non-static methods. Also, if you want this class to be a utility class, remember to add a private constructor to prevent instantiation. (Note, that this use was known before PMD 5.1.0 as UseSingleton). UseUtilityClass (Priority: 1, Ruleset: Design) https://docs.pmd-code.org/pmd-doc-7.14.0/pmd_rules_java_design.html#useutilityclass

public static void main(String[] args) throws IOException {
// Create three different products
ProtobufProduct product1 = ProtobufProduct.newBuilder()
// Create a single product that will be used for all three scenarios
ProtobufProduct product = ProtobufProduct.newBuilder()
.setId(1001)
.setName("Laptop")
.setPrice(999.99)
.build();

ProtobufProduct product2 = ProtobufProduct.newBuilder()
.setId(1002)
.setName("Smartphone")
.setPrice(599.99)
.build();

ProtobufProduct product3 = ProtobufProduct.newBuilder()
.setId(1003)
.setName("Headphones")
.setPrice(149.99)
.build();

// Serialize and encode each product
String encodedProduct1 = serializeAndEncode(product1);
String encodedProduct2 = serializeAndEncode(product2);
String encodedProduct3 = serializeAndEncode(product3);
// Create three different serializations of the same product
String standardProduct = serializeAndEncode(product);
String productWithSimpleIndex = serializeWithSimpleMessageIndex(product);
String productWithComplexIndex = serializeWithComplexMessageIndex(product);

// Serialize and encode an integer key
// Serialize and encode an integer key (same for all records)
String encodedKey = serializeAndEncodeInteger(42);

// Print the results
System.out.println("Base64 encoded Protobuf products for use in kafka-protobuf-event.json:");
System.out.println("\nProduct 1 (with key):");
System.out.println("key: \"" + encodedKey + "\",");
System.out.println("value: \"" + encodedProduct1 + "\",");

System.out.println("\nProduct 2 (with key):");
System.out.println("key: \"" + encodedKey + "\",");
System.out.println("value: \"" + encodedProduct2 + "\",");

System.out.println("\nProduct 3 (without key):");
System.out.println("key: null,");
System.out.println("value: \"" + encodedProduct3 + "\",");

// Print a sample event structure
System.out.println("\nSample event structure:");
printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3);
System.out.println("Base64 encoded Protobuf products with different message index scenarios:");
System.out.println("\n1. Standard Protobuf (no message index):");
System.out.println("value: \"" + standardProduct + "\"");

System.out.println("\n2. Simple Message Index (single 0):");
System.out.println("value: \"" + productWithSimpleIndex + "\"");

System.out.println("\n3. Complex Message Index (array [1,0]):");
System.out.println("value: \"" + productWithComplexIndex + "\"");

// Print the merged event structure
System.out.println("\n" + "=".repeat(80));
System.out.println("MERGED EVENT WITH ALL THREE SCENARIOS");
System.out.println("=".repeat(80));
printSampleEvent(encodedKey, standardProduct, productWithSimpleIndex, productWithComplexIndex);
}

private static String serializeAndEncode(ProtobufProduct product) {
return Base64.getEncoder().encodeToString(product.toByteArray());
}

/**
* Serializes a protobuf product with a simple Confluent message index (single 0).
* Format: [0][protobuf_data]
*
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
*/
private static String serializeWithSimpleMessageIndex(ProtobufProduct product) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);

// Write simple message index (single 0)
codedOutput.writeUInt32NoTag(0);

// Write the protobuf data
product.writeTo(codedOutput);

codedOutput.flush();
return Base64.getEncoder().encodeToString(baos.toByteArray());
}

/**
* Serializes a protobuf product with a complex Confluent message index (array [1,0]).
* Format: [2][1][0][protobuf_data] where 2 is the array length
*
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
*/
private static String serializeWithComplexMessageIndex(ProtobufProduct product) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);

// Write complex message index array [1,0]
codedOutput.writeUInt32NoTag(2); // Array length
codedOutput.writeUInt32NoTag(1); // First index value
codedOutput.writeUInt32NoTag(0); // Second index value

// Write the protobuf data
product.writeTo(codedOutput);

codedOutput.flush();
return Base64.getEncoder().encodeToString(baos.toByteArray());
}

private static String serializeAndEncodeInteger(Integer value) {
// For simple types like integers, we'll just convert to string and encode
return Base64.getEncoder().encodeToString(value.toString().getBytes());
}

private static void printSampleEvent(String key, String product1, String product2, String product3) {
private static void printSampleEvent(String key, String standardProduct, String simpleIndexProduct,
String complexIndexProduct) {
System.out.println("{\n" +
" \"eventSource\": \"aws:kafka\",\n" +
" \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n"
Expand All @@ -83,7 +116,7 @@
" \"timestamp\": 1545084650987,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": \"" + key + "\",\n" +
" \"value\": \"" + product1 + "\",\n" +
" \"value\": \"" + standardProduct + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
Expand All @@ -97,7 +130,7 @@
" \"timestamp\": 1545084650988,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": \"" + key + "\",\n" +
" \"value\": \"" + product2 + "\",\n" +
" \"value\": \"" + simpleIndexProduct + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
Expand All @@ -111,7 +144,7 @@
" \"timestamp\": 1545084650989,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": null,\n" +
" \"value\": \"" + product3 + "\",\n" +
" \"value\": \"" + complexIndexProduct + "\",\n" +
" \"headers\": [\n" +
" {\n" +
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,27 @@
package software.amazon.lambda.powertools.kafka.serializers;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.CodedInputStream;
import com.google.protobuf.Message;
import com.google.protobuf.Parser;

/**
* Deserializer for Kafka records using Protocol Buffers format.
* Supports both standard protobuf serialization and Confluent Schema Registry serialization using messages indices.
*
* For Confluent-serialized data, assumes the magic byte and schema ID have already been stripped
* by the Kafka ESM, leaving only the message index (if present) and protobuf data.
*
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
*/
public class KafkaProtobufDeserializer extends AbstractKafkaDeserializer {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProtobufDeserializer.class);

@Override
@SuppressWarnings("unchecked")
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
Expand All @@ -29,7 +42,9 @@ protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException
try {
// Get the parser from the generated Protobuf class
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
Message message = parser.parseFrom(data);

// Try to deserialize the data, handling potential Confluent message indices
Message message = deserializeWithMessageIndexHandling(data, parser);
return type.cast(message);
} catch (Exception e) {
throw new IOException("Failed to deserialize Protobuf data.", e);
Expand All @@ -40,4 +55,45 @@ protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException
+ "Consider using an alternative Deserializer.");
}
}

private Message deserializeWithMessageIndexHandling(byte[] data, Parser<Message> parser) throws IOException {
try {
LOGGER.debug("Attempting to deserialize as standard protobuf data");
return parser.parseFrom(data);
} catch (Exception e) {
LOGGER.debug("Standard protobuf parsing failed, attempting Confluent message-index handling");
return deserializeWithMessageIndex(data, parser);
}
}

private Message deserializeWithMessageIndex(byte[] data, Parser<Message> parser) throws IOException {
CodedInputStream codedInputStream = CodedInputStream.newInstance(data);

try {
// https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
// Read the first varint - this could be:
// 1. A single 0 (simple case - first message type)
// 2. The length of the message index array (complex case)
int firstValue = codedInputStream.readUInt32();

if (firstValue == 0) {
// Simple case: Single 0 byte means first message type
LOGGER.debug("Found simple message-index case (single 0), parsing remaining data as protobuf");
return parser.parseFrom(codedInputStream);
} else {
// Complex case: firstValue is the length of the message index array
LOGGER.debug("Found complex message-index case with array length: {}, skipping {} message index values",
firstValue, firstValue);
for (int i = 0; i < firstValue; i++) {
codedInputStream.readUInt32(); // Skip each message index value
}
Comment on lines +87 to +89

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will clip multiple bytes which might be part of the actual data as well. We have seen cases for confluent serialized proto messages where first byte is not the length but index of the message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I will update to reading varint and also handle Glue bytes as an additional edge-case.

// Now the remaining data should be the actual protobuf message
LOGGER.debug("Finished skipping message indexes, parsing remaining data as protobuf");
return parser.parseFrom(codedInputStream);
}

} catch (Exception e) {
throw new IOException("Failed to parse protobuf data with or without message index", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.google.protobuf.CodedOutputStream;

import software.amazon.lambda.powertools.kafka.serializers.test.protobuf.TestProduct;

class KafkaProtobufDeserializerTest {
Expand Down Expand Up @@ -72,4 +75,78 @@ void shouldThrowExceptionWhenDeserializingInvalidProtobufData() {
.isInstanceOf(IOException.class)
.hasMessageContaining("Failed to deserialize Protobuf data");
}

@Test
void shouldDeserializeProtobufDataWithSimpleMessageIndex() throws IOException {
// Given
TestProduct product = TestProduct.newBuilder()
.setId(456)
.setName("Simple Index Product")
.setPrice(199.99)
.build();

// Create protobuf data with simple message index (single 0)
byte[] protobufDataWithSimpleIndex = createProtobufDataWithSimpleMessageIndex(product);

// When
TestProduct result = deserializer.deserializeObject(protobufDataWithSimpleIndex, TestProduct.class);

// Then
assertThat(result).isNotNull();
assertThat(result.getId()).isEqualTo(456);
assertThat(result.getName()).isEqualTo("Simple Index Product");
assertThat(result.getPrice()).isEqualTo(199.99);
}

@Test
void shouldDeserializeProtobufDataWithComplexMessageIndex() throws IOException {
// Given
TestProduct product = TestProduct.newBuilder()
.setId(789)
.setName("Complex Index Product")
.setPrice(299.99)
.build();

// Create protobuf data with complex message index (array [1,0])
byte[] protobufDataWithComplexIndex = createProtobufDataWithComplexMessageIndex(product);

// When
TestProduct result = deserializer.deserializeObject(protobufDataWithComplexIndex, TestProduct.class);

// Then
assertThat(result).isNotNull();
assertThat(result.getId()).isEqualTo(789);
assertThat(result.getName()).isEqualTo("Complex Index Product");
assertThat(result.getPrice()).isEqualTo(299.99);
}

private byte[] createProtobufDataWithSimpleMessageIndex(TestProduct product) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);

// Write simple message index (single 0)
codedOutput.writeUInt32NoTag(0);

// Write the protobuf data
product.writeTo(codedOutput);

codedOutput.flush();
return baos.toByteArray();
}

private byte[] createProtobufDataWithComplexMessageIndex(TestProduct product) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);

// Write complex message index array [1,0]
codedOutput.writeUInt32NoTag(2); // Array length
codedOutput.writeUInt32NoTag(1); // First index value
codedOutput.writeUInt32NoTag(0); // Second index value

// Write the protobuf data
product.writeTo(codedOutput);

codedOutput.flush();
return baos.toByteArray();
}
}
Loading