Skip to content

Commit 5ec8b5f

Browse files
committed
fix(kafka): Add support for confluent message indices.
1 parent eebc06a commit 5ec8b5f

File tree

5 files changed

+216
-43
lines changed

5 files changed

+216
-43
lines changed

examples/powertools-examples-kafka/events/kafka-protobuf-event.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
"timestamp": 1545084650988,
2626
"timestampType": "CREATE_TIME",
2727
"key": "NDI=",
28-
"value": "COoHEgpTbWFydHBob25lGVK4HoXrv4JA",
28+
"value": "AAjpBxIGTGFwdG9wGVK4HoXrP49A",
2929
"headers": [
3030
{
3131
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
@@ -39,7 +39,7 @@
3939
"timestamp": 1545084650989,
4040
"timestampType": "CREATE_TIME",
4141
"key": null,
42-
"value": "COsHEgpIZWFkcGhvbmVzGUjhehSuv2JA",
42+
"value": "AgEACOkHEgZMYXB0b3AZUrgehes/j0A=",
4343
"headers": [
4444
{
4545
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]

examples/powertools-examples-kafka/tools/README.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ The tool will output base64-encoded values for Avro products that can be used in
4545
mvn exec:java -Dexec.mainClass="org.demo.kafka.tools.GenerateProtobufSamples"
4646
```
4747

48-
The tool will output base64-encoded values for Protobuf products that can be used in `../events/kafka-protobuf-event.json`.
48+
The tool will output base64-encoded values for Protobuf products that can be used in `../events/kafka-protobuf-event.json`. This generator creates samples with and without Confluent message-indexes to test different serialization scenarios.
4949

5050
## Output
5151

@@ -55,6 +55,13 @@ Each generator produces:
5555
2. An integer key (42) and one entry with a nullish key to test for edge-cases
5656
3. A complete sample event structure that can be used directly for testing
5757

58+
The Protobuf generators additionally create samples with different Confluent message-index formats:
59+
- Standard protobuf (no message indexes)
60+
- Simple message index (single 0 byte)
61+
- Complex message index (length-prefixed array)
62+
63+
For more information about Confluent Schema Registry serialization formats and wire format specifications, see the [Confluent documentation](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format).
64+
5865
## Example
5966

6067
After generating the samples, you can copy the output into the respective event files:

examples/powertools-examples-kafka/tools/src/main/java/org/demo/kafka/tools/GenerateProtobufSamples.java

Lines changed: 72 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,106 @@
11
package org.demo.kafka.tools;
22

3+
import java.io.ByteArrayOutputStream;
34
import java.io.IOException;
45
import java.util.Base64;
56

67
import org.demo.kafka.protobuf.ProtobufProduct;
78

9+
import com.google.protobuf.CodedOutputStream;
10+
811
/**
912
* Utility class to generate base64-encoded Protobuf serialized products
1013
* for use in test events.
1114
*/
1215
public class GenerateProtobufSamples {
1316

1417
public static void main(String[] args) throws IOException {
15-
// Create three different products
16-
ProtobufProduct product1 = ProtobufProduct.newBuilder()
18+
// Create a single product that will be used for all three scenarios
19+
ProtobufProduct product = ProtobufProduct.newBuilder()
1720
.setId(1001)
1821
.setName("Laptop")
1922
.setPrice(999.99)
2023
.build();
2124

22-
ProtobufProduct product2 = ProtobufProduct.newBuilder()
23-
.setId(1002)
24-
.setName("Smartphone")
25-
.setPrice(599.99)
26-
.build();
27-
28-
ProtobufProduct product3 = ProtobufProduct.newBuilder()
29-
.setId(1003)
30-
.setName("Headphones")
31-
.setPrice(149.99)
32-
.build();
33-
34-
// Serialize and encode each product
35-
String encodedProduct1 = serializeAndEncode(product1);
36-
String encodedProduct2 = serializeAndEncode(product2);
37-
String encodedProduct3 = serializeAndEncode(product3);
25+
// Create three different serializations of the same product
26+
String standardProduct = serializeAndEncode(product);
27+
String productWithSimpleIndex = serializeWithSimpleMessageIndex(product);
28+
String productWithComplexIndex = serializeWithComplexMessageIndex(product);
3829

39-
// Serialize and encode an integer key
30+
// Serialize and encode an integer key (same for all records)
4031
String encodedKey = serializeAndEncodeInteger(42);
4132

4233
// Print the results
43-
System.out.println("Base64 encoded Protobuf products for use in kafka-protobuf-event.json:");
44-
System.out.println("\nProduct 1 (with key):");
45-
System.out.println("key: \"" + encodedKey + "\",");
46-
System.out.println("value: \"" + encodedProduct1 + "\",");
47-
48-
System.out.println("\nProduct 2 (with key):");
49-
System.out.println("key: \"" + encodedKey + "\",");
50-
System.out.println("value: \"" + encodedProduct2 + "\",");
51-
52-
System.out.println("\nProduct 3 (without key):");
53-
System.out.println("key: null,");
54-
System.out.println("value: \"" + encodedProduct3 + "\",");
55-
56-
// Print a sample event structure
57-
System.out.println("\nSample event structure:");
58-
printSampleEvent(encodedKey, encodedProduct1, encodedProduct2, encodedProduct3);
34+
System.out.println("Base64 encoded Protobuf products with different message index scenarios:");
35+
System.out.println("\n1. Standard Protobuf (no message index):");
36+
System.out.println("value: \"" + standardProduct + "\"");
37+
38+
System.out.println("\n2. Simple Message Index (single 0):");
39+
System.out.println("value: \"" + productWithSimpleIndex + "\"");
40+
41+
System.out.println("\n3. Complex Message Index (array [1,0]):");
42+
System.out.println("value: \"" + productWithComplexIndex + "\"");
43+
44+
// Print the merged event structure
45+
System.out.println("\n" + "=".repeat(80));
46+
System.out.println("MERGED EVENT WITH ALL THREE SCENARIOS");
47+
System.out.println("=".repeat(80));
48+
printSampleEvent(encodedKey, standardProduct, productWithSimpleIndex, productWithComplexIndex);
5949
}
6050

6151
private static String serializeAndEncode(ProtobufProduct product) {
6252
return Base64.getEncoder().encodeToString(product.toByteArray());
6353
}
6454

55+
/**
56+
* Serializes a protobuf product with a simple Confluent message index (single 0).
57+
* Format: [0][protobuf_data]
58+
*
59+
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
60+
*/
61+
private static String serializeWithSimpleMessageIndex(ProtobufProduct product) throws IOException {
62+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
63+
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);
64+
65+
// Write simple message index (single 0)
66+
codedOutput.writeUInt32NoTag(0);
67+
68+
// Write the protobuf data
69+
product.writeTo(codedOutput);
70+
71+
codedOutput.flush();
72+
return Base64.getEncoder().encodeToString(baos.toByteArray());
73+
}
74+
75+
/**
76+
* Serializes a protobuf product with a complex Confluent message index (array [1,0]).
77+
* Format: [2][1][0][protobuf_data] where 2 is the array length
78+
*
79+
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
80+
*/
81+
private static String serializeWithComplexMessageIndex(ProtobufProduct product) throws IOException {
82+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
83+
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);
84+
85+
// Write complex message index array [1,0]
86+
codedOutput.writeUInt32NoTag(2); // Array length
87+
codedOutput.writeUInt32NoTag(1); // First index value
88+
codedOutput.writeUInt32NoTag(0); // Second index value
89+
90+
// Write the protobuf data
91+
product.writeTo(codedOutput);
92+
93+
codedOutput.flush();
94+
return Base64.getEncoder().encodeToString(baos.toByteArray());
95+
}
96+
6597
private static String serializeAndEncodeInteger(Integer value) {
6698
// For simple types like integers, we'll just convert to string and encode
6799
return Base64.getEncoder().encodeToString(value.toString().getBytes());
68100
}
69101

70-
private static void printSampleEvent(String key, String product1, String product2, String product3) {
102+
private static void printSampleEvent(String key, String standardProduct, String simpleIndexProduct,
103+
String complexIndexProduct) {
71104
System.out.println("{\n" +
72105
" \"eventSource\": \"aws:kafka\",\n" +
73106
" \"eventSourceArn\": \"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\",\n"
@@ -83,7 +116,7 @@ private static void printSampleEvent(String key, String product1, String product
83116
" \"timestamp\": 1545084650987,\n" +
84117
" \"timestampType\": \"CREATE_TIME\",\n" +
85118
" \"key\": \"" + key + "\",\n" +
86-
" \"value\": \"" + product1 + "\",\n" +
119+
" \"value\": \"" + standardProduct + "\",\n" +
87120
" \"headers\": [\n" +
88121
" {\n" +
89122
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
@@ -97,7 +130,7 @@ private static void printSampleEvent(String key, String product1, String product
97130
" \"timestamp\": 1545084650988,\n" +
98131
" \"timestampType\": \"CREATE_TIME\",\n" +
99132
" \"key\": \"" + key + "\",\n" +
100-
" \"value\": \"" + product2 + "\",\n" +
133+
" \"value\": \"" + simpleIndexProduct + "\",\n" +
101134
" \"headers\": [\n" +
102135
" {\n" +
103136
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +
@@ -111,7 +144,7 @@ private static void printSampleEvent(String key, String product1, String product
111144
" \"timestamp\": 1545084650989,\n" +
112145
" \"timestampType\": \"CREATE_TIME\",\n" +
113146
" \"key\": null,\n" +
114-
" \"value\": \"" + product3 + "\",\n" +
147+
" \"value\": \"" + complexIndexProduct + "\",\n" +
115148
" \"headers\": [\n" +
116149
" {\n" +
117150
" \"headerKey\": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n" +

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializer.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,27 @@
1313
package software.amazon.lambda.powertools.kafka.serializers;
1414

1515
import java.io.IOException;
16+
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
import com.google.protobuf.CodedInputStream;
1621
import com.google.protobuf.Message;
1722
import com.google.protobuf.Parser;
1823

1924
/**
2025
* Deserializer for Kafka records using Protocol Buffers format.
26+
* Supports both standard protobuf serialization and Confluent Schema Registry serialization using messages indices.
27+
*
28+
* For Confluent-serialized data, assumes the magic byte and schema ID have already been stripped
29+
* by the Kafka ESM, leaving only the message index (if present) and protobuf data.
30+
*
31+
* @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
2132
*/
2233
public class KafkaProtobufDeserializer extends AbstractKafkaDeserializer {
2334

35+
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProtobufDeserializer.class);
36+
2437
@Override
2538
@SuppressWarnings("unchecked")
2639
protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException {
@@ -29,7 +42,9 @@ protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException
2942
try {
3043
// Get the parser from the generated Protobuf class
3144
Parser<Message> parser = (Parser<Message>) type.getMethod("parser").invoke(null);
32-
Message message = parser.parseFrom(data);
45+
46+
// Try to deserialize the data, handling potential Confluent message indices
47+
Message message = deserializeWithMessageIndexHandling(data, parser);
3348
return type.cast(message);
3449
} catch (Exception e) {
3550
throw new IOException("Failed to deserialize Protobuf data.", e);
@@ -40,4 +55,45 @@ protected <T> T deserializeObject(byte[] data, Class<T> type) throws IOException
4055
+ "Consider using an alternative Deserializer.");
4156
}
4257
}
58+
59+
private Message deserializeWithMessageIndexHandling(byte[] data, Parser<Message> parser) throws IOException {
60+
try {
61+
LOGGER.debug("Attempting to deserialize as standard protobuf data");
62+
return parser.parseFrom(data);
63+
} catch (Exception e) {
64+
LOGGER.debug("Standard protobuf parsing failed, attempting Confluent message-index handling");
65+
return deserializeWithMessageIndex(data, parser);
66+
}
67+
}
68+
69+
private Message deserializeWithMessageIndex(byte[] data, Parser<Message> parser) throws IOException {
70+
CodedInputStream codedInputStream = CodedInputStream.newInstance(data);
71+
72+
try {
73+
// https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
74+
// Read the first varint - this could be:
75+
// 1. A single 0 (simple case - first message type)
76+
// 2. The length of the message index array (complex case)
77+
int firstValue = codedInputStream.readUInt32();
78+
79+
if (firstValue == 0) {
80+
// Simple case: Single 0 byte means first message type
81+
LOGGER.debug("Found simple message-index case (single 0), parsing remaining data as protobuf");
82+
return parser.parseFrom(codedInputStream);
83+
} else {
84+
// Complex case: firstValue is the length of the message index array
85+
LOGGER.debug("Found complex message-index case with array length: {}, skipping {} message index values",
86+
firstValue, firstValue);
87+
for (int i = 0; i < firstValue; i++) {
88+
codedInputStream.readUInt32(); // Skip each message index value
89+
}
90+
// Now the remaining data should be the actual protobuf message
91+
LOGGER.debug("Finished skipping message indexes, parsing remaining data as protobuf");
92+
return parser.parseFrom(codedInputStream);
93+
}
94+
95+
} catch (Exception e) {
96+
throw new IOException("Failed to parse protobuf data with or without message index", e);
97+
}
98+
}
4399
}

powertools-kafka/src/test/java/software/amazon/lambda/powertools/kafka/serializers/KafkaProtobufDeserializerTest.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
import static org.assertj.core.api.Assertions.assertThat;
1616
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1717

18+
import java.io.ByteArrayOutputStream;
1819
import java.io.IOException;
1920

2021
import org.junit.jupiter.api.BeforeEach;
2122
import org.junit.jupiter.api.Test;
2223

24+
import com.google.protobuf.CodedOutputStream;
25+
2326
import software.amazon.lambda.powertools.kafka.serializers.test.protobuf.TestProduct;
2427

2528
class KafkaProtobufDeserializerTest {
@@ -72,4 +75,78 @@ void shouldThrowExceptionWhenDeserializingInvalidProtobufData() {
7275
.isInstanceOf(IOException.class)
7376
.hasMessageContaining("Failed to deserialize Protobuf data");
7477
}
78+
79+
@Test
80+
void shouldDeserializeProtobufDataWithSimpleMessageIndex() throws IOException {
81+
// Given
82+
TestProduct product = TestProduct.newBuilder()
83+
.setId(456)
84+
.setName("Simple Index Product")
85+
.setPrice(199.99)
86+
.build();
87+
88+
// Create protobuf data with simple message index (single 0)
89+
byte[] protobufDataWithSimpleIndex = createProtobufDataWithSimpleMessageIndex(product);
90+
91+
// When
92+
TestProduct result = deserializer.deserializeObject(protobufDataWithSimpleIndex, TestProduct.class);
93+
94+
// Then
95+
assertThat(result).isNotNull();
96+
assertThat(result.getId()).isEqualTo(456);
97+
assertThat(result.getName()).isEqualTo("Simple Index Product");
98+
assertThat(result.getPrice()).isEqualTo(199.99);
99+
}
100+
101+
@Test
102+
void shouldDeserializeProtobufDataWithComplexMessageIndex() throws IOException {
103+
// Given
104+
TestProduct product = TestProduct.newBuilder()
105+
.setId(789)
106+
.setName("Complex Index Product")
107+
.setPrice(299.99)
108+
.build();
109+
110+
// Create protobuf data with complex message index (array [1,0])
111+
byte[] protobufDataWithComplexIndex = createProtobufDataWithComplexMessageIndex(product);
112+
113+
// When
114+
TestProduct result = deserializer.deserializeObject(protobufDataWithComplexIndex, TestProduct.class);
115+
116+
// Then
117+
assertThat(result).isNotNull();
118+
assertThat(result.getId()).isEqualTo(789);
119+
assertThat(result.getName()).isEqualTo("Complex Index Product");
120+
assertThat(result.getPrice()).isEqualTo(299.99);
121+
}
122+
123+
private byte[] createProtobufDataWithSimpleMessageIndex(TestProduct product) throws IOException {
124+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
125+
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);
126+
127+
// Write simple message index (single 0)
128+
codedOutput.writeUInt32NoTag(0);
129+
130+
// Write the protobuf data
131+
product.writeTo(codedOutput);
132+
133+
codedOutput.flush();
134+
return baos.toByteArray();
135+
}
136+
137+
private byte[] createProtobufDataWithComplexMessageIndex(TestProduct product) throws IOException {
138+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
139+
CodedOutputStream codedOutput = CodedOutputStream.newInstance(baos);
140+
141+
// Write complex message index array [1,0]
142+
codedOutput.writeUInt32NoTag(2); // Array length
143+
codedOutput.writeUInt32NoTag(1); // First index value
144+
codedOutput.writeUInt32NoTag(0); // Second index value
145+
146+
// Write the protobuf data
147+
product.writeTo(codedOutput);
148+
149+
codedOutput.flush();
150+
return baos.toByteArray();
151+
}
75152
}

0 commit comments

Comments
 (0)