Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions powertools-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@
<artifactId>sdk-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.crac</groupId>
<artifactId>crac</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down Expand Up @@ -238,4 +242,25 @@
</plugins>
</build>

<profiles>
<profile>
<id>generate-classesloaded-file</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>
-Xlog:class+load=info:classesloaded.txt
--add-opens java.base/java.util=ALL-UNNAMED
--add-opens java.base/java.lang=ALL-UNNAMED
</argLine>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@

import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Map;

import com.amazonaws.services.lambda.runtime.CustomPojoSerializer;
import com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.crac.Context;
import org.crac.Core;
import org.crac.Resource;
import software.amazon.lambda.powertools.common.internal.ClassPreLoader;
import software.amazon.lambda.powertools.kafka.internal.DeserializationUtils;
import software.amazon.lambda.powertools.kafka.serializers.KafkaAvroDeserializer;
import software.amazon.lambda.powertools.kafka.serializers.KafkaJsonDeserializer;
Expand All @@ -30,11 +38,11 @@
/**
* Custom Lambda serializer supporting Kafka events. It delegates to the appropriate deserializer based on the
* deserialization type specified by {@link software.amazon.lambda.powertools.kafka.Deserialization} annotation.
*
*
* Kafka serializers need to be specified explicitly, otherwise, the default Lambda serializer from
* {@link com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory} will be used.
*/
public class PowertoolsSerializer implements CustomPojoSerializer {
public class PowertoolsSerializer implements CustomPojoSerializer, Resource {
private static final Map<DeserializationType, PowertoolsDeserializer> DESERIALIZERS = Map.of(
DeserializationType.KAFKA_JSON, new KafkaJsonDeserializer(),
DeserializationType.KAFKA_AVRO, new KafkaAvroDeserializer(),
Expand All @@ -43,6 +51,13 @@ DeserializationType.KAFKA_PROTOBUF, new KafkaProtobufDeserializer(),

private final PowertoolsDeserializer deserializer;

private static final PowertoolsSerializer INSTANCE = new PowertoolsSerializer();

// CRaC registration happens at class loading time
static {
Core.getGlobalContext().register(INSTANCE);
}

public PowertoolsSerializer() {
this.deserializer = DESERIALIZERS.getOrDefault(
DeserializationUtils.determineDeserializationType(),
Expand All @@ -64,4 +79,64 @@ public <T> void toJson(T value, OutputStream output, Type type) {
// This is the Lambda default Output serialization
JacksonFactory.getInstance().getSerializer(type).toJson(value, output);
}

@Override
public void beforeCheckpoint(Context<? extends Resource> context) throws Exception {
JacksonFactory.getInstance().getSerializer(KafkaEvent.class);
JacksonFactory.getInstance().getSerializer(ConsumerRecord.class);
JacksonFactory.getInstance().getSerializer(String.class);

DeserializationUtils.determineDeserializationType();

jsonPriming();

ClassPreLoader.preloadClasses();
}

@Override
public void afterRestore(Context<? extends Resource> context) throws Exception {
// No action needed after restore
}

private void jsonPriming() {
String kafkaJson = "{\n" +
" \"eventSource\": \"aws:kafka\",\n" +
" \"records\": {\n" +
" \"prime-topic-1\": [\n" +
" {\n" +
" \"topic\": \"prime-topic-1\",\n" +
" \"partition\": 0,\n" +
" \"offset\": 0,\n" +
" \"timestamp\": 1545084650987,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": null,\n" +
" \"value\": null,\n" +
" \"headers\": []\n" +
" }\n" +
" ]\n" +
" }\n" +
"}";
Type consumerRecords = createConsumerRecordsType(String.class, String.class);
PowertoolsDeserializer deserializers = DESERIALIZERS.get(DeserializationType.KAFKA_JSON);
deserializers.fromJson(kafkaJson, consumerRecords);
}

private Type createConsumerRecordsType(Class<?> keyClass, Class<?> valueClass) {
return new ParameterizedType() {
@Override
public Type[] getActualTypeArguments() {
return new Type[] { keyClass, valueClass };
}

@Override
public Type getRawType() {
return ConsumerRecords.class;
}

@Override
public Type getOwnerType() {
return null;
}
};
}
}
Loading
Loading