Skip to content

Commit f42acaa

Browse files
committed
feat: add CRaC priming support to powertools-kafka module
- Add maven test profile and classesloaded.txt for preloading - Add Crac dependency and update PowertoolsSerializer to register as Crac Resource - Add tests in PowertoolsSerializerTest to assert beforeCheckpoint and afterRestore hooks do not throw exception
1 parent 53adbe2 commit f42acaa

File tree

4 files changed

+6752
-2
lines changed

4 files changed

+6752
-2
lines changed

powertools-kafka/pom.xml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,14 @@
7878
<artifactId>aws-lambda-java-serialization</artifactId>
7979
<version>${lambda-serialization.version}</version>
8080
</dependency>
81+
<dependency>
82+
<groupId>software.amazon.lambda</groupId>
83+
<artifactId>powertools-common</artifactId>
84+
</dependency>
85+
<dependency>
86+
<groupId>org.crac</groupId>
87+
<artifactId>crac</artifactId>
88+
</dependency>
8189

8290
<!-- Test dependencies -->
8391
<dependency>
@@ -223,4 +231,25 @@
223231
</plugins>
224232
</build>
225233

234+
<profiles>
235+
<profile>
236+
<id>generate-classesloaded-file</id>
237+
<build>
238+
<plugins>
239+
<plugin>
240+
<groupId>org.apache.maven.plugins</groupId>
241+
<artifactId>maven-surefire-plugin</artifactId>
242+
<configuration>
243+
<argLine>
244+
-Xlog:class+load=info:classesloaded.txt
245+
--add-opens java.base/java.util=ALL-UNNAMED
246+
--add-opens java.base/java.lang=ALL-UNNAMED
247+
</argLine>
248+
</configuration>
249+
</plugin>
250+
</plugins>
251+
</build>
252+
</profile>
253+
</profiles>
254+
226255
</project>

powertools-kafka/src/main/java/software/amazon/lambda/powertools/kafka/PowertoolsSerializer.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,20 @@
1414

1515
import java.io.InputStream;
1616
import java.io.OutputStream;
17+
import java.lang.reflect.ParameterizedType;
1718
import java.lang.reflect.Type;
1819
import java.util.Map;
1920

2021
import com.amazonaws.services.lambda.runtime.CustomPojoSerializer;
2122
import com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory;
23+
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
2224

25+
import org.apache.kafka.clients.consumer.ConsumerRecord;
26+
import org.apache.kafka.clients.consumer.ConsumerRecords;
27+
import org.crac.Context;
28+
import org.crac.Core;
29+
import org.crac.Resource;
30+
import software.amazon.lambda.powertools.common.internal.ClassPreLoader;
2331
import software.amazon.lambda.powertools.kafka.internal.DeserializationUtils;
2432
import software.amazon.lambda.powertools.kafka.serializers.KafkaAvroDeserializer;
2533
import software.amazon.lambda.powertools.kafka.serializers.KafkaJsonDeserializer;
@@ -30,11 +38,11 @@
3038
/**
3139
* Custom Lambda serializer supporting Kafka events. It delegates to the appropriate deserializer based on the
3240
* deserialization type specified by {@link software.amazon.lambda.powertools.kafka.Deserialization} annotation.
33-
*
41+
*
3442
* Kafka serializers need to be specified explicitly, otherwise, the default Lambda serializer from
3543
* {@link com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory} will be used.
3644
*/
37-
public class PowertoolsSerializer implements CustomPojoSerializer {
45+
public class PowertoolsSerializer implements CustomPojoSerializer, Resource {
3846
private static final Map<DeserializationType, PowertoolsDeserializer> DESERIALIZERS = Map.of(
3947
DeserializationType.KAFKA_JSON, new KafkaJsonDeserializer(),
4048
DeserializationType.KAFKA_AVRO, new KafkaAvroDeserializer(),
@@ -43,6 +51,13 @@ DeserializationType.KAFKA_PROTOBUF, new KafkaProtobufDeserializer(),
4351

4452
private final PowertoolsDeserializer deserializer;
4553

54+
private static final PowertoolsSerializer INSTANCE = new PowertoolsSerializer();
55+
56+
// CRaC registration happens at class loading time
57+
static{
58+
Core.getGlobalContext().register(INSTANCE);
59+
}
60+
4661
public PowertoolsSerializer() {
4762
this.deserializer = DESERIALIZERS.getOrDefault(
4863
DeserializationUtils.determineDeserializationType(),
@@ -64,4 +79,50 @@ public <T> void toJson(T value, OutputStream output, Type type) {
6479
// This is the Lambda default Output serialization
6580
JacksonFactory.getInstance().getSerializer(type).toJson(value, output);
6681
}
82+
83+
@Override
84+
public void beforeCheckpoint(Context<? extends Resource> context) throws Exception {
85+
JacksonFactory.getInstance().getSerializer(KafkaEvent.class);
86+
JacksonFactory.getInstance().getSerializer(ConsumerRecord.class);
87+
JacksonFactory.getInstance().getSerializer(String.class);
88+
89+
DeserializationUtils.determineDeserializationType();
90+
91+
String kafkaJson = "{\n" +
92+
" \"eventSource\": \"aws:kafka\",\n" +
93+
" \"records\": {\n" +
94+
" \"test-topic-1\": [\n" +
95+
" {\n" +
96+
" \"topic\": \"test-topic-1\",\n" +
97+
" \"partition\": 0,\n" +
98+
" \"offset\": 0,\n" +
99+
" \"timestamp\": 1545084650987,\n" +
100+
" \"timestampType\": \"CREATE_TIME\",\n" +
101+
" \"key\": null,\n" +
102+
" \"value\": null,\n" +
103+
" \"headers\": []\n" +
104+
" }\n" +
105+
" ]\n" +
106+
" }\n" +
107+
"}";
108+
109+
Type consumerRecords = new ParameterizedType() {
110+
@Override
111+
public Type[] getActualTypeArguments() { return new Type[] { String.class, String.class }; }
112+
@Override
113+
public Type getRawType() { return ConsumerRecords.class; }
114+
@Override
115+
public Type getOwnerType() { return null; }
116+
};
117+
118+
PowertoolsDeserializer deserializers = DESERIALIZERS.get(DeserializationType.KAFKA_JSON);
119+
deserializers.fromJson(kafkaJson, consumerRecords);
120+
121+
ClassPreLoader.preloadClasses();
122+
}
123+
124+
@Override
125+
public void afterRestore(Context<? extends Resource> context) throws Exception {
126+
// No action needed after restore
127+
}
67128
}

0 commit comments

Comments
 (0)