|
12 | 12 | */ |
13 | 13 | package software.amazon.lambda.powertools.kafka; |
14 | 14 |
|
15 | | -import java.io.ByteArrayOutputStream; |
16 | | -import java.io.IOException; |
17 | 15 | import java.io.InputStream; |
18 | 16 | import java.io.OutputStream; |
19 | 17 | import java.lang.reflect.ParameterizedType; |
20 | 18 | import java.lang.reflect.Type; |
21 | | -import java.util.Base64; |
22 | 19 | import java.util.Map; |
23 | 20 |
|
24 | 21 | import com.amazonaws.services.lambda.runtime.CustomPojoSerializer; |
25 | 22 | import com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory; |
26 | 23 | import com.amazonaws.services.lambda.runtime.events.KafkaEvent; |
27 | 24 |
|
28 | | -import org.apache.avro.Schema; |
29 | | -import org.apache.avro.generic.GenericData; |
30 | | -import org.apache.avro.generic.GenericDatumWriter; |
31 | | -import org.apache.avro.generic.GenericRecord; |
32 | | -import org.apache.avro.io.BinaryEncoder; |
33 | | -import org.apache.avro.io.DatumWriter; |
34 | | -import org.apache.avro.io.EncoderFactory; |
35 | 25 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
36 | 26 | import org.apache.kafka.clients.consumer.ConsumerRecords; |
37 | 27 | import org.crac.Context; |
@@ -99,11 +89,6 @@ public void beforeCheckpoint(Context<? extends Resource> context) throws Excepti |
99 | 89 | DeserializationUtils.determineDeserializationType(); |
100 | 90 |
|
101 | 91 | jsonPriming(); |
102 | | - try { |
103 | | - avroPriming(); |
104 | | - } catch (Exception e) { |
105 | | - // Continue without any interruption if avro priming fails |
106 | | - } |
107 | 92 |
|
108 | 93 | ClassPreLoader.preloadClasses(); |
109 | 94 | } |
@@ -136,56 +121,6 @@ private void jsonPriming(){ |
136 | 121 | deserializers.fromJson(kafkaJson, consumerRecords); |
137 | 122 | } |
138 | 123 |
|
139 | | - private void avroPriming() throws IOException { |
140 | | - String avroSchema = "{\n" + |
141 | | - " \"type\": \"record\",\n" + |
142 | | - " \"name\": \"SimpleProduct\",\n" + |
143 | | - " \"namespace\": \"software.amazon.lambda.powertools.kafka.test\",\n" + |
144 | | - " \"fields\": [\n" + |
145 | | - " {\"name\": \"id\", \"type\": \"string\"},\n" + |
146 | | - " {\"name\": \"name\", \"type\": \"string\"},\n" + |
147 | | - " {\"name\": \"price\", \"type\": [\"null\", \"double\"], \"default\": null}\n" + |
148 | | - " ]\n" + |
149 | | - "}"; |
150 | | - Schema schema = new Schema.Parser().parse(avroSchema); |
151 | | - |
152 | | - // Create a GenericRecord |
153 | | - GenericRecord avroRecord = new GenericData.Record(schema); |
154 | | - avroRecord.put("id", "prime-topic-1"); |
155 | | - avroRecord.put("name", "Prime Product"); |
156 | | - avroRecord.put("price", 0.0); |
157 | | - |
158 | | - // Create Kafka event JSON with avro data |
159 | | - DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); |
160 | | - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
161 | | - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null); |
162 | | - datumWriter.write(avroRecord, encoder); |
163 | | - byte[] avroBytes = outputStream.toByteArray(); |
164 | | - String base64Value = Base64.getEncoder().encodeToString(avroBytes); |
165 | | - |
166 | | - String kafkaAvroJson = "{\n" + |
167 | | - " \"eventSource\": \"aws:kafka\",\n" + |
168 | | - " \"records\": {\n" + |
169 | | - " \"prime-topic-1\": [\n" + |
170 | | - " {\n" + |
171 | | - " \"topic\": \"prime-topic-1\",\n" + |
172 | | - " \"partition\": 0,\n" + |
173 | | - " \"offset\": 0,\n" + |
174 | | - " \"timestamp\": 1545084650987,\n" + |
175 | | - " \"timestampType\": \"CREATE_TIME\",\n" + |
176 | | - " \"key\": null,\n" + |
177 | | - " \"value\": \"" + base64Value + "\",\n" + |
178 | | - " \"headers\": []\n" + |
179 | | - " }\n" + |
180 | | - " ]\n" + |
181 | | - " }\n" + |
182 | | - "}"; |
183 | | - |
184 | | - Type records = createConsumerRecordsType(String.class, GenericRecord.class); |
185 | | - PowertoolsDeserializer deserializers = DESERIALIZERS.get(DeserializationType.KAFKA_AVRO); |
186 | | - deserializers.fromJson(kafkaAvroJson, records); |
187 | | - } |
188 | | - |
189 | 124 | private Type createConsumerRecordsType(Class<?> keyClass, Class<?> valueClass) { |
190 | 125 | return new ParameterizedType() { |
191 | 126 | @Override |
|
0 commit comments