22
33import java .io .ByteArrayOutputStream ;
44import java .io .IOException ;
5+ import java .nio .ByteBuffer ;
56import java .util .Base64 ;
67
8+ import org .apache .kafka .common .utils .ByteUtils ;
79import org .demo .kafka .protobuf .ProtobufProduct ;
810
911import com .google .protobuf .CodedOutputStream ;
@@ -19,77 +21,102 @@ private GenerateProtobufSamples() {
1921 }
2022
2123 public static void main (String [] args ) throws IOException {
22- // Create a single product that will be used for all three scenarios
24+ // Create a single product that will be used for all four scenarios
2325 ProtobufProduct product = ProtobufProduct .newBuilder ()
2426 .setId (1001 )
2527 .setName ("Laptop" )
2628 .setPrice (999.99 )
2729 .build ();
2830
29- // Create three different serializations of the same product
31+ // Create four different serializations of the same product
3032 String standardProduct = serializeAndEncode (product );
31- String productWithSimpleIndex = serializeWithSimpleMessageIndex (product );
32- String productWithComplexIndex = serializeWithComplexMessageIndex (product );
33+ String productWithConfluentSimpleIndex = serializeWithConfluentSimpleMessageIndex (product );
34+ String productWithConfluentComplexIndex = serializeWithConfluentComplexMessageIndex (product );
35+ String productWithGlueMagicByte = serializeWithGlueMagicByte (product );
3336
3437 // Serialize and encode an integer key (same for all records)
3538 String encodedKey = serializeAndEncodeInteger (42 );
3639
3740 // Print the results
38- System .out .println ("Base64 encoded Protobuf products with different message index scenarios:" );
39- System .out .println ("\n 1. Standard Protobuf (no message index ):" );
41+ System .out .println ("Base64 encoded Protobuf products with different scenarios:" );
42+ System .out .println ("\n 1. Plain Protobuf (no schema registry ):" );
4043 System .out .println ("value: \" " + standardProduct + "\" " );
4144
42- System .out .println ("\n 2. Simple Message Index (single 0):" );
43- System .out .println ("value: \" " + productWithSimpleIndex + "\" " );
45+ System .out .println ("\n 2. Confluent with Simple Message Index (optimized single 0):" );
46+ System .out .println ("value: \" " + productWithConfluentSimpleIndex + "\" " );
4447
45- System .out .println ("\n 3. Complex Message Index (array [1,0]):" );
46- System .out .println ("value: \" " + productWithComplexIndex + "\" " );
48+ System .out .println ("\n 3. Confluent with Complex Message Index (array [1,0]):" );
49+ System .out .println ("value: \" " + productWithConfluentComplexIndex + "\" " );
50+
51+ System .out .println ("\n 4. Glue with Magic Byte:" );
52+ System .out .println ("value: \" " + productWithGlueMagicByte + "\" " );
4753
4854 // Print the merged event structure
4955 System .out .println ("\n " + "=" .repeat (80 ));
50- System .out .println ("MERGED EVENT WITH ALL THREE SCENARIOS" );
56+ System .out .println ("MERGED EVENT WITH ALL FOUR SCENARIOS" );
5157 System .out .println ("=" .repeat (80 ));
52- printSampleEvent (encodedKey , standardProduct , productWithSimpleIndex , productWithComplexIndex );
58+ printSampleEvent (encodedKey , standardProduct , productWithConfluentSimpleIndex , productWithConfluentComplexIndex ,
59+ productWithGlueMagicByte );
5360 }
5461
5562 private static String serializeAndEncode (ProtobufProduct product ) {
5663 return Base64 .getEncoder ().encodeToString (product .toByteArray ());
5764 }
5865
5966 /**
60- * Serializes a protobuf product with a simple Confluent message index (single 0).
67+ * Serializes a protobuf product with a simple Confluent message index (optimized single 0).
6168 * Format: [0][protobuf_data]
6269 *
6370 * @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
6471 */
65- private static String serializeWithSimpleMessageIndex (ProtobufProduct product ) throws IOException {
72+ private static String serializeWithConfluentSimpleMessageIndex (ProtobufProduct product ) throws IOException {
6673 ByteArrayOutputStream baos = new ByteArrayOutputStream ();
67- CodedOutputStream codedOutput = CodedOutputStream .newInstance (baos );
6874
69- // Write simple message index (single 0)
70- codedOutput . writeUInt32NoTag (0 );
75+ // Write optimized simple message index for Confluent (single 0 byte for [0] )
76+ baos . write (0 );
7177
7278 // Write the protobuf data
73- product .writeTo ( codedOutput );
79+ baos . write ( product .toByteArray () );
7480
75- codedOutput .flush ();
7681 return Base64 .getEncoder ().encodeToString (baos .toByteArray ());
7782 }
7883
7984 /**
8085 * Serializes a protobuf product with a complex Confluent message index (array [1,0]).
81- * Format: [2][1][0][protobuf_data] where 2 is the array length
86+ * Format: [2][1][0][protobuf_data] where 2 is the array length using varint encoding
8287 *
8388 * @see {@link https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format}
8489 */
85- private static String serializeWithComplexMessageIndex (ProtobufProduct product ) throws IOException {
90+ private static String serializeWithConfluentComplexMessageIndex (ProtobufProduct product ) throws IOException {
91+ ByteArrayOutputStream baos = new ByteArrayOutputStream ();
92+
93+ // Write complex message index array [1,0] using ByteUtils
94+ ByteBuffer buffer = ByteBuffer .allocate (1024 );
95+ ByteUtils .writeVarint (2 , buffer ); // Array length
96+ ByteUtils .writeVarint (1 , buffer ); // First index value
97+ ByteUtils .writeVarint (0 , buffer ); // Second index value
98+
99+ buffer .flip ();
100+ byte [] indexData = new byte [buffer .remaining ()];
101+ buffer .get (indexData );
102+ baos .write (indexData );
103+
104+ // Write the protobuf data
105+ baos .write (product .toByteArray ());
106+
107+ return Base64 .getEncoder ().encodeToString (baos .toByteArray ());
108+ }
109+
110+ /**
111+ * Serializes a protobuf product with Glue magic byte.
112+ * Format: [1][protobuf_data] where 1 is the magic byte
113+ */
114+ private static String serializeWithGlueMagicByte (ProtobufProduct product ) throws IOException {
86115 ByteArrayOutputStream baos = new ByteArrayOutputStream ();
87116 CodedOutputStream codedOutput = CodedOutputStream .newInstance (baos );
88117
89- // Write complex message index array [1,0]
90- codedOutput .writeUInt32NoTag (2 ); // Array length
91- codedOutput .writeUInt32NoTag (1 ); // First index value
92- codedOutput .writeUInt32NoTag (0 ); // Second index value
118+ // Write Glue magic byte (single UInt32)
119+ codedOutput .writeUInt32NoTag (1 );
93120
94121 // Write the protobuf data
95122 product .writeTo (codedOutput );
@@ -103,8 +130,8 @@ private static String serializeAndEncodeInteger(Integer value) {
103130 return Base64 .getEncoder ().encodeToString (value .toString ().getBytes ());
104131 }
105132
106- private static void printSampleEvent (String key , String standardProduct , String simpleIndexProduct ,
107- String complexIndexProduct ) {
133+ private static void printSampleEvent (String key , String standardProduct , String confluentSimpleProduct ,
134+ String confluentComplexProduct , String glueProduct ) {
108135 System .out .println ("{\n " +
109136 " \" eventSource\" : \" aws:kafka\" ,\n " +
110137 " \" eventSourceArn\" : \" arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4\" ,\n "
@@ -134,12 +161,16 @@ private static void printSampleEvent(String key, String standardProduct, String
134161 " \" timestamp\" : 1545084650988,\n " +
135162 " \" timestampType\" : \" CREATE_TIME\" ,\n " +
136163 " \" key\" : \" " + key + "\" ,\n " +
137- " \" value\" : \" " + simpleIndexProduct + "\" ,\n " +
164+ " \" value\" : \" " + confluentSimpleProduct + "\" ,\n " +
138165 " \" headers\" : [\n " +
139166 " {\n " +
140167 " \" headerKey\" : [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n " +
141168 " }\n " +
142- " ]\n " +
169+ " ],\n " +
170+ " \" valueSchemaMetadata\" : {\n " +
171+ " \" schemaId\" : \" 123\" ,\n " +
172+ " \" dataFormat\" : \" PROTOBUF\" \n " +
173+ " }\n " +
143174 " },\n " +
144175 " {\n " +
145176 " \" topic\" : \" mytopic\" ,\n " +
@@ -148,12 +179,34 @@ private static void printSampleEvent(String key, String standardProduct, String
148179 " \" timestamp\" : 1545084650989,\n " +
149180 " \" timestampType\" : \" CREATE_TIME\" ,\n " +
150181 " \" key\" : null,\n " +
151- " \" value\" : \" " + complexIndexProduct + "\" ,\n " +
182+ " \" value\" : \" " + confluentComplexProduct + "\" ,\n " +
152183 " \" headers\" : [\n " +
153184 " {\n " +
154185 " \" headerKey\" : [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n " +
155186 " }\n " +
156- " ]\n " +
187+ " ],\n " +
188+ " \" valueSchemaMetadata\" : {\n " +
189+ " \" schemaId\" : \" 456\" ,\n " +
190+ " \" dataFormat\" : \" PROTOBUF\" \n " +
191+ " }\n " +
192+ " },\n " +
193+ " {\n " +
194+ " \" topic\" : \" mytopic\" ,\n " +
195+ " \" partition\" : 0,\n " +
196+ " \" offset\" : 18,\n " +
197+ " \" timestamp\" : 1545084650990,\n " +
198+ " \" timestampType\" : \" CREATE_TIME\" ,\n " +
199+ " \" key\" : \" " + key + "\" ,\n " +
200+ " \" value\" : \" " + glueProduct + "\" ,\n " +
201+ " \" headers\" : [\n " +
202+ " {\n " +
203+ " \" headerKey\" : [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]\n " +
204+ " }\n " +
205+ " ],\n " +
206+ " \" valueSchemaMetadata\" : {\n " +
207+ " \" schemaId\" : \" 12345678-1234-1234-1234-123456789012\" ,\n " +
208+ " \" dataFormat\" : \" PROTOBUF\" \n " +
209+ " }\n " +
157210 " }\n " +
158211 " ]\n " +
159212 " }\n " +
0 commit comments