1414
1515import java .io .InputStream ;
1616import java .io .OutputStream ;
17+ import java .lang .reflect .ParameterizedType ;
1718import java .lang .reflect .Type ;
1819import java .util .Map ;
1920
2021import com .amazonaws .services .lambda .runtime .CustomPojoSerializer ;
2122import com .amazonaws .services .lambda .runtime .serialization .factories .JacksonFactory ;
23+ import com .amazonaws .services .lambda .runtime .events .KafkaEvent ;
2224
25+ import org .apache .kafka .clients .consumer .ConsumerRecord ;
26+ import org .apache .kafka .clients .consumer .ConsumerRecords ;
27+ import org .crac .Context ;
28+ import org .crac .Core ;
29+ import org .crac .Resource ;
30+ import software .amazon .lambda .powertools .common .internal .ClassPreLoader ;
2331import software .amazon .lambda .powertools .kafka .internal .DeserializationUtils ;
2432import software .amazon .lambda .powertools .kafka .serializers .KafkaAvroDeserializer ;
2533import software .amazon .lambda .powertools .kafka .serializers .KafkaJsonDeserializer ;
3038/**
3139 * Custom Lambda serializer supporting Kafka events. It delegates to the appropriate deserializer based on the
3240 * deserialization type specified by {@link software.amazon.lambda.powertools.kafka.Deserialization} annotation.
33- *
41+ *
3442 * Kafka serializers need to be specified explicitly, otherwise, the default Lambda serializer from
3543 * {@link com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory} will be used.
3644 */
37- public class PowertoolsSerializer implements CustomPojoSerializer {
45+ public class PowertoolsSerializer implements CustomPojoSerializer , Resource {
3846 private static final Map <DeserializationType , PowertoolsDeserializer > DESERIALIZERS = Map .of (
3947 DeserializationType .KAFKA_JSON , new KafkaJsonDeserializer (),
4048 DeserializationType .KAFKA_AVRO , new KafkaAvroDeserializer (),
@@ -43,6 +51,13 @@ DeserializationType.KAFKA_PROTOBUF, new KafkaProtobufDeserializer(),
4351
4452 private final PowertoolsDeserializer deserializer ;
4553
54+ private static final PowertoolsSerializer INSTANCE = new PowertoolsSerializer ();
55+
56+ // CRaC registration happens at class loading time
57+ static {
58+ Core .getGlobalContext ().register (INSTANCE );
59+ }
60+
4661 public PowertoolsSerializer () {
4762 this .deserializer = DESERIALIZERS .getOrDefault (
4863 DeserializationUtils .determineDeserializationType (),
@@ -64,4 +79,50 @@ public <T> void toJson(T value, OutputStream output, Type type) {
6479 // This is the Lambda default Output serialization
6580 JacksonFactory .getInstance ().getSerializer (type ).toJson (value , output );
6681 }
82+
83+ @ Override
84+ public void beforeCheckpoint (Context <? extends Resource > context ) throws Exception {
85+ JacksonFactory .getInstance ().getSerializer (KafkaEvent .class );
86+ JacksonFactory .getInstance ().getSerializer (ConsumerRecord .class );
87+ JacksonFactory .getInstance ().getSerializer (String .class );
88+
89+ DeserializationUtils .determineDeserializationType ();
90+
91+ String kafkaJson = "{\n " +
92+ " \" eventSource\" : \" aws:kafka\" ,\n " +
93+ " \" records\" : {\n " +
94+ " \" test-topic-1\" : [\n " +
95+ " {\n " +
96+ " \" topic\" : \" test-topic-1\" ,\n " +
97+ " \" partition\" : 0,\n " +
98+ " \" offset\" : 0,\n " +
99+ " \" timestamp\" : 1545084650987,\n " +
100+ " \" timestampType\" : \" CREATE_TIME\" ,\n " +
101+ " \" key\" : null,\n " +
102+ " \" value\" : null,\n " +
103+ " \" headers\" : []\n " +
104+ " }\n " +
105+ " ]\n " +
106+ " }\n " +
107+ "}" ;
108+
109+ Type consumerRecords = new ParameterizedType () {
110+ @ Override
111+ public Type [] getActualTypeArguments () { return new Type [] { String .class , String .class }; }
112+ @ Override
113+ public Type getRawType () { return ConsumerRecords .class ; }
114+ @ Override
115+ public Type getOwnerType () { return null ; }
116+ };
117+
118+ PowertoolsDeserializer deserializers = DESERIALIZERS .get (DeserializationType .KAFKA_JSON );
119+ deserializers .fromJson (kafkaJson , consumerRecords );
120+
121+ ClassPreLoader .preloadClasses ();
122+ }
123+
124+ @ Override
125+ public void afterRestore (Context <? extends Resource > context ) throws Exception {
126+ // No action needed after restore
127+ }
67128}
0 commit comments