Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/check-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ on:
- 'powertools-large-messages/**'
- 'powertools-logging/**'
- 'powertools-metrics/**'
- 'powertools-kafka/**'
- 'powertools-parameters/**'
- 'powertools-serialization/**'
- 'powertools-sqs/**'
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/check-spotbugs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ on:
- 'powertools-large-messages/**'
- 'powertools-logging/**'
- 'powertools-metrics/**'
- 'powertools-kafka/**'
- 'powertools-parameters/**'
- 'powertools-serialization/**'
- 'powertools-sqs/**'
Expand Down Expand Up @@ -47,4 +48,4 @@ jobs:
distribution: 'corretto'
java-version: 21
- name: Build with Maven for spotbugs check to mark build as fail if voilations found
run: mvn -Pbuild-with-spotbugs -B install --file pom.xml -DskipTests -Dmaven.javadoc.skip=true -Dspotbugs.failOnError=true
run: mvn -Pbuild-with-spotbugs -B install --file pom.xml -DskipTests -Dmaven.javadoc.skip=true -Dspotbugs.failOnError=true
25 changes: 25 additions & 0 deletions powertools-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@
<artifactId>sdk-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.crac</groupId>
<artifactId>crac</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down Expand Up @@ -238,4 +242,25 @@
</plugins>
</build>

<profiles>
<profile>
<id>generate-classesloaded-file</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>
-Xlog:class+load=info:classesloaded.txt
--add-opens java.base/java.util=ALL-UNNAMED
--add-opens java.base/java.lang=ALL-UNNAMED
</argLine>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@

import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Map;

import com.amazonaws.services.lambda.runtime.CustomPojoSerializer;
import com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.crac.Context;
import org.crac.Core;
import org.crac.Resource;
import software.amazon.lambda.powertools.common.internal.ClassPreLoader;
import software.amazon.lambda.powertools.kafka.internal.DeserializationUtils;
import software.amazon.lambda.powertools.kafka.serializers.KafkaAvroDeserializer;
import software.amazon.lambda.powertools.kafka.serializers.KafkaJsonDeserializer;
Expand All @@ -30,11 +38,11 @@
/**
* Custom Lambda serializer supporting Kafka events. It delegates to the appropriate deserializer based on the
* deserialization type specified by {@link software.amazon.lambda.powertools.kafka.Deserialization} annotation.
*
*
* Kafka serializers need to be specified explicitly, otherwise, the default Lambda serializer from
* {@link com.amazonaws.services.lambda.runtime.serialization.factories.JacksonFactory} will be used.
*/
public class PowertoolsSerializer implements CustomPojoSerializer {
public class PowertoolsSerializer implements CustomPojoSerializer, Resource {
private static final Map<DeserializationType, PowertoolsDeserializer> DESERIALIZERS = Map.of(
DeserializationType.KAFKA_JSON, new KafkaJsonDeserializer(),
DeserializationType.KAFKA_AVRO, new KafkaAvroDeserializer(),
Expand All @@ -43,6 +51,13 @@ DeserializationType.KAFKA_PROTOBUF, new KafkaProtobufDeserializer(),

private final PowertoolsDeserializer deserializer;

private static final PowertoolsSerializer INSTANCE = new PowertoolsSerializer();

// CRaC registration happens at class loading time
static {
Core.getGlobalContext().register(INSTANCE);
}

public PowertoolsSerializer() {
this.deserializer = DESERIALIZERS.getOrDefault(
DeserializationUtils.determineDeserializationType(),
Expand All @@ -64,4 +79,64 @@ public <T> void toJson(T value, OutputStream output, Type type) {
// This is the Lambda default Output serialization
JacksonFactory.getInstance().getSerializer(type).toJson(value, output);
}

@Override
public void beforeCheckpoint(Context<? extends Resource> context) throws Exception {
JacksonFactory.getInstance().getSerializer(KafkaEvent.class);
JacksonFactory.getInstance().getSerializer(ConsumerRecord.class);
JacksonFactory.getInstance().getSerializer(String.class);

DeserializationUtils.determineDeserializationType();

jsonPriming();

ClassPreLoader.preloadClasses();
}

@Override
public void afterRestore(Context<? extends Resource> context) throws Exception {
// No action needed after restore
}

private void jsonPriming() {
String kafkaJson = "{\n" +
" \"eventSource\": \"aws:kafka\",\n" +
" \"records\": {\n" +
" \"prime-topic-1\": [\n" +
" {\n" +
" \"topic\": \"prime-topic-1\",\n" +
" \"partition\": 0,\n" +
" \"offset\": 0,\n" +
" \"timestamp\": 1545084650987,\n" +
" \"timestampType\": \"CREATE_TIME\",\n" +
" \"key\": null,\n" +
" \"value\": null,\n" +
" \"headers\": []\n" +
" }\n" +
" ]\n" +
" }\n" +
"}";
Type consumerRecords = createConsumerRecordsType(String.class, String.class);
PowertoolsDeserializer deserializers = DESERIALIZERS.get(DeserializationType.KAFKA_JSON);
deserializers.fromJson(kafkaJson, consumerRecords);
}

private Type createConsumerRecordsType(Class<?> keyClass, Class<?> valueClass) {
return new ParameterizedType() {
@Override
public Type[] getActualTypeArguments() {
return new Type[] { keyClass, valueClass };
}

@Override
public Type getRawType() {
return ConsumerRecords.class;
}

@Override
public Type getOwnerType() {
return null;
}
};
}
}
Loading
Loading