1212 */
1313package software .amazon .lambda .powertools .kafka ;
1414
15+ import java .io .ByteArrayOutputStream ;
16+ import java .io .IOException ;
1517import java .io .InputStream ;
1618import java .io .OutputStream ;
1719import java .lang .reflect .ParameterizedType ;
1820import java .lang .reflect .Type ;
21+ import java .util .Base64 ;
1922import java .util .Map ;
2023
2124import com .amazonaws .services .lambda .runtime .CustomPojoSerializer ;
2225import com .amazonaws .services .lambda .runtime .serialization .factories .JacksonFactory ;
2326import com .amazonaws .services .lambda .runtime .events .KafkaEvent ;
2427
28+ import org .apache .avro .Schema ;
29+ import org .apache .avro .generic .GenericData ;
30+ import org .apache .avro .generic .GenericDatumWriter ;
31+ import org .apache .avro .generic .GenericRecord ;
32+ import org .apache .avro .io .BinaryEncoder ;
33+ import org .apache .avro .io .DatumWriter ;
34+ import org .apache .avro .io .EncoderFactory ;
2535import org .apache .kafka .clients .consumer .ConsumerRecord ;
2636import org .apache .kafka .clients .consumer .ConsumerRecords ;
2737import org .crac .Context ;
@@ -88,12 +98,28 @@ public void beforeCheckpoint(Context<? extends Resource> context) throws Excepti
8898
8999 DeserializationUtils .determineDeserializationType ();
90100
101+ jsonPriming ();
102+ try {
103+ avroPriming ();
104+ } catch (Exception e ) {
105+ // Continue without any interruption if avro priming fails
106+ }
107+
108+ ClassPreLoader .preloadClasses ();
109+ }
110+
111+ @ Override
112+ public void afterRestore (Context <? extends Resource > context ) throws Exception {
113+ // No action needed after restore
114+ }
115+
116+ private void jsonPriming (){
91117 String kafkaJson = "{\n " +
92118 " \" eventSource\" : \" aws:kafka\" ,\n " +
93119 " \" records\" : {\n " +
94- " \" test -topic-1\" : [\n " +
120+ " \" prime -topic-1\" : [\n " +
95121 " {\n " +
96- " \" topic\" : \" test -topic-1\" ,\n " +
122+ " \" topic\" : \" prime -topic-1\" ,\n " +
97123 " \" partition\" : 0,\n " +
98124 " \" offset\" : 0,\n " +
99125 " \" timestamp\" : 1545084650987,\n " +
@@ -105,24 +131,77 @@ public void beforeCheckpoint(Context<? extends Resource> context) throws Excepti
105131 " ]\n " +
106132 " }\n " +
107133 "}" ;
108-
109- Type consumerRecords = new ParameterizedType () {
110- @ Override
111- public Type [] getActualTypeArguments () { return new Type [] { String .class , String .class }; }
112- @ Override
113- public Type getRawType () { return ConsumerRecords .class ; }
114- @ Override
115- public Type getOwnerType () { return null ; }
116- };
117-
134+ Type consumerRecords = createConsumerRecordsType (String .class , String .class );
118135 PowertoolsDeserializer deserializers = DESERIALIZERS .get (DeserializationType .KAFKA_JSON );
119136 deserializers .fromJson (kafkaJson , consumerRecords );
137+ }
120138
121- ClassPreLoader .preloadClasses ();
139+ private void avroPriming () throws IOException {
140+ String avroSchema = "{\n " +
141+ " \" type\" : \" record\" ,\n " +
142+ " \" name\" : \" SimpleProduct\" ,\n " +
143+ " \" namespace\" : \" software.amazon.lambda.powertools.kafka.test\" ,\n " +
144+ " \" fields\" : [\n " +
145+ " {\" name\" : \" id\" , \" type\" : \" string\" },\n " +
146+ " {\" name\" : \" name\" , \" type\" : \" string\" },\n " +
147+ " {\" name\" : \" price\" , \" type\" : [\" null\" , \" double\" ], \" default\" : null}\n " +
148+ " ]\n " +
149+ "}" ;
150+ Schema schema = new Schema .Parser ().parse (avroSchema );
151+
152+ // Create a GenericRecord
153+ GenericRecord avroRecord = new GenericData .Record (schema );
154+ avroRecord .put ("id" , "prime-topic-1" );
155+ avroRecord .put ("name" , "Prime Product" );
156+ avroRecord .put ("price" , 0.0 );
157+
158+ // Create Kafka event JSON with avro data
159+ DatumWriter <GenericRecord > datumWriter = new GenericDatumWriter <>(schema );
160+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream ();
161+ BinaryEncoder encoder = EncoderFactory .get ().binaryEncoder (outputStream , null );
162+ datumWriter .write (avroRecord , encoder );
163+ byte [] avroBytes = outputStream .toByteArray ();
164+ String base64Value = Base64 .getEncoder ().encodeToString (avroBytes );
165+
166+ String kafkaAvroJson = "{\n " +
167+ " \" eventSource\" : \" aws:kafka\" ,\n " +
168+ " \" records\" : {\n " +
169+ " \" prime-topic-1\" : [\n " +
170+ " {\n " +
171+ " \" topic\" : \" prime-topic-1\" ,\n " +
172+ " \" partition\" : 0,\n " +
173+ " \" offset\" : 0,\n " +
174+ " \" timestamp\" : 1545084650987,\n " +
175+ " \" timestampType\" : \" CREATE_TIME\" ,\n " +
176+ " \" key\" : null,\n " +
177+ " \" value\" : \" " + base64Value + "\" ,\n " +
178+ " \" headers\" : []\n " +
179+ " }\n " +
180+ " ]\n " +
181+ " }\n " +
182+ "}" ;
183+
184+ Type records = createConsumerRecordsType (String .class , GenericRecord .class );
185+ PowertoolsDeserializer deserializers = DESERIALIZERS .get (DeserializationType .KAFKA_AVRO );
186+ deserializers .fromJson (kafkaAvroJson , records );
122187 }
123188
124- @ Override
125- public void afterRestore (Context <? extends Resource > context ) throws Exception {
126- // No action needed after restore
189+ private Type createConsumerRecordsType (Class <?> keyClass , Class <?> valueClass ) {
190+ return new ParameterizedType () {
191+ @ Override
192+ public Type [] getActualTypeArguments () {
193+ return new Type [] { keyClass , valueClass };
194+ }
195+
196+ @ Override
197+ public Type getRawType () {
198+ return ConsumerRecords .class ;
199+ }
200+
201+ @ Override
202+ public Type getOwnerType () {
203+ return null ;
204+ }
205+ };
127206 }
128207}
0 commit comments