diff --git a/Dockerfile b/Dockerfile index 996b87c..1afc87e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,12 @@ -# The parent Flink image (flink:1.18.1-java11) only contains the JRE (openjdk:11-jre), and it is missing key -# diagnostic tools. This multistage build will overwrite the JRE with the JDK from openjdk:11 +# The parent Flink image (flink:1.18.1-java17) only contains the JRE (openjdk:17-jre), and it is missing key +# diagnostic tools. This multistage build will overwrite the JRE with the JDK from openjdk:17 # See https://docs.docker.com/develop/develop-images/multistage-build/ -FROM --platform=linux/amd64 openjdk:11 AS jdk_image -FROM --platform=linux/amd64 flink:1.18.1-java11 +# Add --platform=linux/amd64 to the FROM commands below when on Apple silicon +#FROM --platform=linux/amd64 openjdk:17 as jdk_image +FROM flink:1.18.1-java17 # Copy the JDK from the jdk_image -COPY --from=jdk_image /usr/local/openjdk-11 /opt/java/openjdk/ +#COPY --from=jdk_image /usr/java/openjdk-17 /opt/java/openjdk RUN sed -i -e 's/^.*networkaddress.cache.ttl=.*$/networkaddress.cache.ttl=30/g' /opt/java/openjdk/conf/security/java.security RUN sed -i -e 's/^.*networkaddress.cache.negative.ttl=.*$/networkaddress.cache.negative.ttl=10/g' /opt/java/openjdk/conf/security/java.security diff --git a/docker-compose.yml b/docker-compose.yml index 67375cc..97b8e3b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,25 +1,24 @@ services: localstack: - image: localstack/localstack:3.0.2 - platform: linux/amd64 + image: localstack/localstack:3.1.0 + container_name: demo.localhost.localstack.cloud profiles: [kinesis,statefun,all] ports: - "4566:4566" environment: - SERVICES=kinesis - - HOSTNAME=localstack + - HOSTNAME=demo.localhost.localstack.cloud - AWS_REGION=us-east-1 - AWS_ACCESS_KEY_ID=example-access-key-id - AWS_SECRET_ACCESS_KEY=example-secret-access-key - - AWS_ENDPOINT=https://localstack:4566 - - AWS_ENDPOINT_URL=https://localstack:4566 + - AWS_ENDPOINT=https://demo.localhost.localstack.cloud:4566 + - AWS_ENDPOINT_URL=https://demo.localhost.localstack.cloud:4566 - DEBUG=0 - KINESIS_ERROR_PROBABILITY=0.0 - DOCKER_HOST=unix:///var/run/docker.sock create-streams: image: amazon/aws-cli - platform: linux/amd64 profiles: [kinesis,statefun,all] depends_on: - localstack @@ -35,15 +34,15 @@ services: " set -x echo Creating ingress stream - until aws --endpoint-url=http://localstack:4566 kinesis create-stream --stream-name example-ingress-stream --shard-count 1; do sleep 1; done + until aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis create-stream --stream-name example-ingress-stream --shard-count 1; do sleep 1; done echo Creating egress stream - aws --endpoint-url=http://localstack:4566 kinesis create-stream --stream-name example-egress-stream --shard-count 1 + aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis create-stream --stream-name example-egress-stream --shard-count 1 echo Listing streams - aws --endpoint-url=http://localstack:4566 kinesis list-streams + aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis list-streams " jobmanager: - platform: linux/amd64 + container_name: demo.flink-jobmanager profiles: [statefun,all] depends_on: - create-streams @@ -52,8 +51,8 @@ services: # to restore from savepoint, add this to the command: --fromSavepoint file:///savepoints/savepoint-xxxx-yyyyyy command: > standalone-job - -D "rest.address=jobmanager" - -D "jobmanager.rpc.address=jobmanager" + -D "rest.address=demo.flink-jobmanager" + -D "jobmanager.rpc.address=demo.flink-jobmanager" -D "state.savepoints.dir=file:///savepoints" -D "state.checkpoints.dir=file:///checkpoints" --job-classname org.apache.flink.statefun.flink.core.StatefulFunctionsJob @@ -62,22 +61,24 @@ services: expose: - "6123" ports: + - "5065:5065" - "8081:8081" environment: - IS_LOCAL_DEV=true - AWS_REGION=us-east-1 - AWS_ACCESS_KEY_ID=example-access-key-id - AWS_SECRET_ACCESS_KEY=example-secret-access-key - - AWS_ENDPOINT=https://host.docker.internal:4566 + - AWS_ENDPOINT=https://demo.localhost.localstack.cloud:4566 - AWS_CBOR_DISABLE=true - USE_ENHANCED_FANOUT=false # Disable enhanced fanout for local development, there seems to be a bug in localstack - - FLINK_ENV_JAVA_OPTS=-Dcom.amazonaws.sdk.disableCertChecking + - FLINK_ENV_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5065 +# - FLINK_ENV_JAVA_OPTS=-Dcom.amazonaws.sdk.disableCertChecking volumes: - ./docker-mounts/checkpoints:/checkpoints - ./docker-mounts/savepoints:/savepoints taskmanager: - platform: linux/amd64 + container_name: demo.flink-taskmanager profiles: [statefun,all] depends_on: - jobmanager @@ -85,7 +86,7 @@ services: context: . command: > taskmanager - -D "jobmanager.rpc.address=jobmanager" + -D "jobmanager.rpc.address=demo.flink-jobmanager" -D "state.savepoints.dir=file:///savepoints" -D "state.checkpoints.dir=file:///checkpoints" entrypoint: /entrypoint.sh @@ -99,17 +100,17 @@ services: - AWS_REGION=us-east-1 - AWS_ACCESS_KEY_ID=example-access-key-id - AWS_SECRET_ACCESS_KEY=example-secret-access-key - - AWS_ENDPOINT=https://host.docker.internal:4566 + - AWS_ENDPOINT=https://demo.localhost.localstack.cloud:4566 - AWS_CBOR_DISABLE=true - USE_ENHANCED_FANOUT=false # Disable enhanced fanout for local development, there seems to be a bug in localstack - - FLINK_ENV_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5066 -Dcom.amazonaws.sdk.disableCertChecking + - FLINK_ENV_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5066 #-Dcom.amazonaws.sdk.disableCertChecking volumes: - ./docker-mounts/checkpoints:/checkpoints - ./docker-mounts/savepoints:/savepoints send-events: image: amazon/aws-cli - platform: linux/amd64 + container_name: demo.send-events-to-ingress profiles: [send-events,all] volumes: - /var/run/docker.sock:/var/run/docker.sock @@ -124,7 +125,7 @@ services: grep -v test.action /test-resources/product-cart-integration-test-events.jsonl | while read line; do partkey=$$(echo $$line | md5sum | awk '{print $$1}') data=$$(echo $$line | base64 -w 0) - cmd=\"aws --endpoint-url=http://localstack:4566 kinesis put-record --stream-name example-ingress-stream --partition-key $$partkey --data $$data\" + cmd=\"aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis put-record --stream-name example-ingress-stream --partition-key $$partkey --data $$data\" echo $$cmd eval $$cmd done @@ -132,7 +133,7 @@ services: get-egress-events: image: amazon/aws-cli - platform: linux/amd64 + container_name: demo.get-events-from-egress profiles: [get-egress-events,all] volumes: - /var/run/docker.sock:/var/run/docker.sock @@ -145,9 +146,9 @@ services: command: > " yum install --quiet -y jq - shard_id=$$(aws --endpoint-url=http://localstack:4566 kinesis list-shards --stream-name example-egress-stream | jq -crM .Shards[0].ShardId) - shard_iterator=$$(aws --endpoint-url=http://localstack:4566 kinesis get-shard-iterator --shard-id $$shard_id --shard-iterator-type TRIM_HORIZON --stream-name example-egress-stream | jq -crM .ShardIterator) - for encoded_data in $$(aws --endpoint-url=http://localstack:4566 kinesis get-records --shard-iterator $$shard_iterator | jq -crM .Records[].Data); do + shard_id=$$(aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis list-shards --stream-name example-egress-stream | jq -crM .Shards[0].ShardId) + shard_iterator=$$(aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis get-shard-iterator --shard-id $$shard_id --shard-iterator-type TRIM_HORIZON --stream-name example-egress-stream | jq -crM .ShardIterator) + for encoded_data in $$(aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis get-records --shard-iterator $$shard_iterator | jq -crM .Records[].Data); do echo $$encoded_data | base64 -d | jq . done " diff --git a/pom.xml b/pom.xml index 16beac3..3dbcc9c 100644 --- a/pom.xml +++ b/pom.xml @@ -5,6 +5,12 @@ 4.0.0 + + io.micronaut.platform + micronaut-parent + 4.7.6 + + com.example.statefun-java my-stateful-functions-embedded-java 3.3.0 @@ -19,12 +25,20 @@ 3.3.0.1-1.18 1.18.1 3.25.6 - 11 + 17 ${java.version} ${java.version} true 2.20.162 1.2.0 + + 17 + 17 + 4.7.6 + netty + false + com.example.aot.generated + com.example.Application @@ -52,18 +66,58 @@ apache-client + + software.amazon.awssdk + regions + - - org.springframework - spring-context - 5.3.19 + software.amazon.awssdk + auth - org.springframework.boot - spring-boot-starter - 2.4.2 + software.amazon.awssdk + opensearch + + + + + software.amazon.awssdk + sts + + + + + io.micronaut.serde + micronaut-serde-jackson + compile + + + ch.qos.logback + logback-classic + runtime + + + io.micronaut + micronaut-http-client + test + + + io.micronaut.test + micronaut-test-junit5 + test + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-engine + test @@ -127,7 +181,6 @@ ${protobuf.version} - org.apache.flink @@ -186,17 +239,9 @@ - org.springframework.boot - spring-boot-starter-test - test - 2.4.2 - - - - junit - junit - 4.13.1 - test + org.apache.flink + statefun-flink-state-processor + ${statefun.version} @@ -211,6 +256,16 @@ flink-test-utils ${flink.version} test + + + junit + junit + + + org.junit.vintage + junit-vintage-engine + + org.apache.flink @@ -248,12 +303,45 @@ + + io.micronaut.maven + micronaut-maven-plugin + + aot-jar.properties + + + + org.apache.maven.plugins + maven-enforcer-plugin + org.apache.maven.plugins maven-compiler-plugin 11 11 + + + io.micronaut + micronaut-http-validation + ${micronaut.core.version} + + + io.micronaut.serde + micronaut-serde-processor + ${micronaut.serialization.version} + + + io.micronaut + micronaut-inject + + + + + + -Amicronaut.processing.group=com.example + -Amicronaut.processing.module=my-stateful-functions-embedded-java + diff --git a/src/main/java/com/example/stateful_functions/ObjectMapperFactory.java b/src/main/java/com/example/stateful_functions/ObjectMapperFactory.java new file mode 100644 index 0000000..2c418e6 --- /dev/null +++ b/src/main/java/com/example/stateful_functions/ObjectMapperFactory.java @@ -0,0 +1,14 @@ +package com.example.stateful_functions; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.micronaut.context.annotation.Factory; +import jakarta.inject.Singleton; + +@Factory +public class ObjectMapperFactory { + + @Singleton + public ObjectMapper objectMapper() { + return new ObjectMapper(); + } +} diff --git a/src/main/java/com/example/stateful_functions/SpringModule.java b/src/main/java/com/example/stateful_functions/StatefulFunctionsConfigurator.java similarity index 54% rename from src/main/java/com/example/stateful_functions/SpringModule.java rename to src/main/java/com/example/stateful_functions/StatefulFunctionsConfigurator.java index ce92a17..697d846 100644 --- a/src/main/java/com/example/stateful_functions/SpringModule.java +++ b/src/main/java/com/example/stateful_functions/StatefulFunctionsConfigurator.java @@ -4,34 +4,27 @@ import com.example.stateful_functions.function.FunctionProvider; import com.example.stateful_functions.ingress.IngressSpecs; import com.example.stateful_functions.router.MessageRouter; -import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Scope; -import org.springframework.stereotype.Component; import java.util.Map; -@Component -@Configuration -@ComponentScan(basePackages = "com.example.stateful_functions") -public class SpringModule { +@Singleton +public class StatefulFunctionsConfigurator { - private static Logger LOG = LoggerFactory.getLogger(SpringModule.class); + private static Logger LOG = LoggerFactory.getLogger(StatefulFunctionsConfigurator.class); - @Autowired + @Inject MessageRouter messageRouter; - @Autowired + @Inject FunctionProvider functionProvider; - public SpringModule() { } + public StatefulFunctionsConfigurator() { } public void configure(Map globalConfiguration, StatefulFunctionModule.Binder binder) { // bind the default ingress to the system along with the router @@ -45,11 +38,4 @@ public void configure(Map globalConfiguration, StatefulFunctionM functionProvider.bindFunctions(binder); } - @Bean - @Scope("singleton") - public ObjectMapper getObjectMapper() { - ObjectMapper objectMapper = new ObjectMapper(); - // Configure objectMapper has needed here - return objectMapper; - } } diff --git a/src/main/java/com/example/stateful_functions/StatefulFunctionsModule.java b/src/main/java/com/example/stateful_functions/StatefulFunctionsModule.java index 3233f35..f60d9b1 100644 --- a/src/main/java/com/example/stateful_functions/StatefulFunctionsModule.java +++ b/src/main/java/com/example/stateful_functions/StatefulFunctionsModule.java @@ -19,10 +19,8 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; -import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - +import io.micronaut.context.ApplicationContext; import java.util.Map; public final class StatefulFunctionsModule implements StatefulFunctionModule { @@ -37,9 +35,9 @@ public static void setApplicationContext(ApplicationContext applicationContext) @Override public void configure(Map globalConfiguration, Binder binder) { if (applicationContext == null) { - applicationContext = new AnnotationConfigApplicationContext(SpringModule.class); + applicationContext = io.micronaut.context.ApplicationContext.run(); } - SpringModule module = applicationContext.getBean(SpringModule.class); + StatefulFunctionsConfigurator module = applicationContext.getBean(StatefulFunctionsConfigurator.class); module.configure(globalConfiguration, binder); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); diff --git a/src/main/java/com/example/stateful_functions/cloudevents/ExampleCloudEventDataAccess.java b/src/main/java/com/example/stateful_functions/cloudevents/ExampleCloudEventDataAccess.java index eca91b9..06d57cd 100644 --- a/src/main/java/com/example/stateful_functions/cloudevents/ExampleCloudEventDataAccess.java +++ b/src/main/java/com/example/stateful_functions/cloudevents/ExampleCloudEventDataAccess.java @@ -5,29 +5,28 @@ import com.example.stateful_functions.cloudevents.data.ProductEventDetails; import com.example.stateful_functions.cloudevents.data.internal.FunctionSubscriptionDetails; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import io.cloudevents.CloudEvent; import io.cloudevents.core.CloudEventUtils; import io.cloudevents.core.data.PojoCloudEventData; import io.cloudevents.jackson.PojoCloudEventDataMapper; -import org.apache.flink.annotation.VisibleForTesting; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; -@Component +@Singleton public class ExampleCloudEventDataAccess { - @Autowired - ObjectMapper objectMapper; + @Inject + public ObjectMapper objectMapper; public ExampleCloudEventDataAccess() { } @VisibleForTesting - public ExampleCloudEventDataAccess(ObjectMapper objectMapper) { + public void setObjectMapper(ObjectMapper objectMapper) { this.objectMapper = objectMapper; } - public ObjectMapper getObjectMapper() { return objectMapper; } diff --git a/src/main/java/com/example/stateful_functions/cloudevents/ExampleCloudEventJsonFormat.java b/src/main/java/com/example/stateful_functions/cloudevents/ExampleCloudEventJsonFormat.java index 3d917be..c0bb1c5 100644 --- a/src/main/java/com/example/stateful_functions/cloudevents/ExampleCloudEventJsonFormat.java +++ b/src/main/java/com/example/stateful_functions/cloudevents/ExampleCloudEventJsonFormat.java @@ -4,13 +4,13 @@ import io.cloudevents.core.format.EventFormat; import io.cloudevents.core.provider.EventFormatProvider; import io.cloudevents.jackson.JsonFormat; +import jakarta.inject.Singleton; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; -@Component +@Singleton public class ExampleCloudEventJsonFormat { private static Logger LOG = LoggerFactory.getLogger(ExampleCloudEventJsonFormat.class); diff --git a/src/main/java/com/example/stateful_functions/function/AbstractStatefulFunction.java b/src/main/java/com/example/stateful_functions/function/AbstractStatefulFunction.java index 825a778..3e16f1a 100644 --- a/src/main/java/com/example/stateful_functions/function/AbstractStatefulFunction.java +++ b/src/main/java/com/example/stateful_functions/function/AbstractStatefulFunction.java @@ -11,12 +11,12 @@ import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.jackson.JsonCloudEventData; +import jakarta.inject.Inject; import org.apache.flink.statefun.sdk.Context; import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.StatefulFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import java.net.URI; import java.time.OffsetDateTime; @@ -29,10 +29,10 @@ public abstract class AbstractStatefulFunction implements StatefulFunction { public abstract FunctionType getFunctionType(); - @Autowired + @Inject protected ExampleCloudEventJsonFormat cloudEventJsonFormat; - @Autowired + @Inject protected ExampleCloudEventDataAccess cloudEventDataAccess; @Override diff --git a/src/main/java/com/example/stateful_functions/function/FunctionProvider.java b/src/main/java/com/example/stateful_functions/function/FunctionProvider.java index 583f4ab..9006285 100644 --- a/src/main/java/com/example/stateful_functions/function/FunctionProvider.java +++ b/src/main/java/com/example/stateful_functions/function/FunctionProvider.java @@ -1,5 +1,9 @@ package com.example.stateful_functions.function; +import io.micronaut.context.ApplicationContext; +import jakarta.annotation.PostConstruct; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.StatefulFunction; @@ -7,48 +11,41 @@ import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.config.BeanDefinition; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider; -import org.springframework.core.type.filter.AnnotationTypeFilter; -import org.springframework.stereotype.Component; +import java.util.Collection; import java.util.HashMap; import java.util.Map; -@Component -public class FunctionProvider implements StatefulFunctionProvider, ApplicationContextAware, InitializingBean { +@Singleton +public class FunctionProvider implements StatefulFunctionProvider { private static final Logger LOG = LoggerFactory.getLogger(FunctionProvider.class); private static final String STATEFUN_BASE_PACKAGE = "com.example.stateful_functions"; - private ApplicationContext applicationContext; + @Inject + ApplicationContext applicationContext; - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } private final Map functionsByType = new HashMap<>(); - @Override + @PostConstruct public void afterPropertiesSet() throws Exception { - ClassPathScanningCandidateComponentProvider componentProvider = new ClassPathScanningCandidateComponentProvider(false); - componentProvider.addIncludeFilter(new AnnotationTypeFilter(StatefunFunction.class)); - for (BeanDefinition bean : componentProvider.findCandidateComponents(STATEFUN_BASE_PACKAGE)) { + Collection functions = applicationContext.getBeansOfType(StatefulFunction.class); + + for (StatefulFunction function : functions) { - Class functionClass = Class.forName(bean.getBeanClassName()); + Class functionClass = function.getClass(); try { functionsByType.put((FunctionType) functionClass.getField("FUNCTION_TYPE").get(null), functionClass); } catch (Exception x) { - String message = "Can't access required static FunctionType field FUNCTION_TYPE in " + bean.getBeanClassName(); + String message = "Can't access required static FunctionType field FUNCTION_TYPE in " + functionClass.getName(); LOG.error(message); throw new ReflectiveOperationException(message, x); } diff --git a/src/main/java/com/example/stateful_functions/function/StatefunFunction.java b/src/main/java/com/example/stateful_functions/function/StatefunFunction.java index 8c09596..1387823 100644 --- a/src/main/java/com/example/stateful_functions/function/StatefunFunction.java +++ b/src/main/java/com/example/stateful_functions/function/StatefunFunction.java @@ -1,7 +1,6 @@ package com.example.stateful_functions.function; -import org.springframework.context.annotation.Scope; -import org.springframework.stereotype.Component; +import io.micronaut.context.annotation.Prototype; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -13,7 +12,6 @@ */ @Retention(RetentionPolicy.RUNTIME) @Target({ ElementType.TYPE}) -@Component -@Scope("prototype") +@Prototype public @interface StatefunFunction { } diff --git a/src/main/java/com/example/stateful_functions/router/AbstractForwarder.java b/src/main/java/com/example/stateful_functions/router/AbstractForwarder.java index 28511fd..bfc6305 100644 --- a/src/main/java/com/example/stateful_functions/router/AbstractForwarder.java +++ b/src/main/java/com/example/stateful_functions/router/AbstractForwarder.java @@ -4,18 +4,18 @@ import com.example.stateful_functions.cloudevents.ExampleCloudEventJsonFormat; import com.example.stateful_functions.protobuf.ExampleProtobuf; import io.cloudevents.CloudEvent; +import jakarta.inject.Inject; import org.apache.flink.statefun.sdk.FunctionType; import org.apache.flink.statefun.sdk.io.Router; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; public abstract class AbstractForwarder implements Forwarder { - @Autowired + @Inject ExampleCloudEventJsonFormat cloudEventJsonFormat; - @Autowired + @Inject protected ExampleCloudEventDataAccess cloudEventDataAccess; protected Logger getLogger() { diff --git a/src/main/java/com/example/stateful_functions/router/MessageRouter.java b/src/main/java/com/example/stateful_functions/router/MessageRouter.java index 0bbe021..ab61527 100644 --- a/src/main/java/com/example/stateful_functions/router/MessageRouter.java +++ b/src/main/java/com/example/stateful_functions/router/MessageRouter.java @@ -3,25 +3,25 @@ import com.example.stateful_functions.cloudevents.ExampleCloudEventJsonFormat; import com.example.stateful_functions.protobuf.ExampleProtobuf; import io.cloudevents.CloudEvent; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; import org.apache.flink.statefun.sdk.io.Router; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; import java.util.List; -@Component +@Singleton public final class MessageRouter implements Router { private static final Logger LOG = LoggerFactory.getLogger(MessageRouter.class); // Spring automatically collects all the of Forwarders and provides them as a list here - @Autowired + @Inject private List forwarders; - @Autowired + @Inject ExampleCloudEventJsonFormat cloudEventJsonFormat; diff --git a/src/main/java/com/example/stateful_functions/router/forward/CartEventToCartFunction.java b/src/main/java/com/example/stateful_functions/router/forward/CartEventToCartFunction.java index 8e7f60f..660c611 100644 --- a/src/main/java/com/example/stateful_functions/router/forward/CartEventToCartFunction.java +++ b/src/main/java/com/example/stateful_functions/router/forward/CartEventToCartFunction.java @@ -5,10 +5,10 @@ import com.example.stateful_functions.protobuf.ExampleProtobuf; import com.example.stateful_functions.router.AbstractForwarder; import io.cloudevents.CloudEvent; +import jakarta.inject.Singleton; import org.apache.flink.statefun.sdk.io.Router; -import org.springframework.stereotype.Component; -@Component +@Singleton public class CartEventToCartFunction extends AbstractForwarder { @Override diff --git a/src/main/java/com/example/stateful_functions/router/forward/ProductEventToProductFunction.java b/src/main/java/com/example/stateful_functions/router/forward/ProductEventToProductFunction.java index 14e0c76..0169936 100644 --- a/src/main/java/com/example/stateful_functions/router/forward/ProductEventToProductFunction.java +++ b/src/main/java/com/example/stateful_functions/router/forward/ProductEventToProductFunction.java @@ -5,10 +5,10 @@ import com.example.stateful_functions.protobuf.ExampleProtobuf; import com.example.stateful_functions.router.AbstractForwarder; import io.cloudevents.CloudEvent; +import jakarta.inject.Singleton; import org.apache.flink.statefun.sdk.io.Router; -import org.springframework.stereotype.Component; -@Component +@Singleton public class ProductEventToProductFunction extends AbstractForwarder { @Override diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 77cc591..9ae81e3 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,3 +1 @@ -# Reduce noise, primarily for tests -spring.main.banner-mode=off app.version=1.0 diff --git a/src/test/java/com/example/stateful_functions/AbstractStatefulFunctionTest.java b/src/test/java/com/example/stateful_functions/AbstractStatefulFunctionTest.java index 14b0320..13d8b5f 100644 --- a/src/test/java/com/example/stateful_functions/AbstractStatefulFunctionTest.java +++ b/src/test/java/com/example/stateful_functions/AbstractStatefulFunctionTest.java @@ -1,16 +1,7 @@ package com.example.stateful_functions; -import org.junit.runner.RunWith; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.context.annotation.Import; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringRunner; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; -@RunWith(SpringRunner.class) -@SpringBootTest( - webEnvironment = SpringBootTest.WebEnvironment.NONE, - classes = {SpringModule.class}) -@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) -@Import(SpringTestConfiguration.class) +@MicronautTest public abstract class AbstractStatefulFunctionTest { } diff --git a/src/test/java/com/example/stateful_functions/SpringTestConfiguration.java b/src/test/java/com/example/stateful_functions/SpringTestConfiguration.java index 5073821..5d07c0b 100644 --- a/src/test/java/com/example/stateful_functions/SpringTestConfiguration.java +++ b/src/test/java/com/example/stateful_functions/SpringTestConfiguration.java @@ -1,13 +1,11 @@ package com.example.stateful_functions; -import org.springframework.beans.BeansException; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; +import io.micronaut.context.ApplicationContext; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; -public class SpringTestConfiguration implements ApplicationContextAware { - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - StatefulFunctionsModule.setApplicationContext(applicationContext); - } +@Singleton +public class SpringTestConfiguration { + @Inject + ApplicationContext applicationContext; } diff --git a/src/test/java/com/example/stateful_functions/cloudevents/EventDetailsTest.java b/src/test/java/com/example/stateful_functions/cloudevents/EventDetailsTest.java index ddd1177..175e3f0 100644 --- a/src/test/java/com/example/stateful_functions/cloudevents/EventDetailsTest.java +++ b/src/test/java/com/example/stateful_functions/cloudevents/EventDetailsTest.java @@ -10,7 +10,8 @@ import io.cloudevents.core.provider.EventFormatProvider; import io.cloudevents.jackson.JsonCloudEventData; import io.cloudevents.jackson.JsonFormat; -import org.junit.Test; +import org.junit.jupiter.api.Test; + import java.math.BigDecimal; import java.net.URI; @@ -18,9 +19,9 @@ import java.time.ZoneOffset; import java.util.UUID; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class EventDetailsTest { @@ -43,7 +44,8 @@ public void run() { ObjectMapper objectMapper = new ObjectMapper(); EventFormat eventFormat = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE); - ExampleCloudEventDataAccess cloudEventDataAccess = new ExampleCloudEventDataAccess(objectMapper); + ExampleCloudEventDataAccess cloudEventDataAccess = new ExampleCloudEventDataAccess(); + cloudEventDataAccess.setObjectMapper(objectMapper); ProductEventDetails productDetails = new ProductEventDetails.Builder() .id(WIDGET_PRODUCT_ID) diff --git a/src/test/java/com/example/stateful_functions/integration/ProductAndCartFunctionsIntegrationTest.java b/src/test/java/com/example/stateful_functions/integration/ProductAndCartFunctionsIntegrationTest.java index b179a2a..ba43e79 100644 --- a/src/test/java/com/example/stateful_functions/integration/ProductAndCartFunctionsIntegrationTest.java +++ b/src/test/java/com/example/stateful_functions/integration/ProductAndCartFunctionsIntegrationTest.java @@ -8,16 +8,16 @@ import com.example.stateful_functions.cloudevents.data.ProductAvailability; import com.example.stateful_functions.protobuf.ExampleProtobuf; import io.cloudevents.CloudEvent; -import org.junit.Test; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import java.math.BigDecimal; import java.util.List; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; /* * Test the interaction between the product and cart stateful functions. Here, we are sending product and cart action @@ -26,10 +26,10 @@ public class ProductAndCartFunctionsIntegrationTest extends StatefulFunctionIntegrationTest { private static final Logger LOG = LoggerFactory.getLogger(ProductAndCartFunctionsIntegrationTest.class); - @Autowired + @Inject ExampleCloudEventJsonFormat cloudEventJsonFormat; - @Autowired + @Inject ExampleCloudEventDataAccess cloudEventDataAccess; @@ -45,7 +45,7 @@ public void run() throws Exception { .reduce((first, second) -> second) .orElse(null); - assertNotNull("cart status event was not sent", cartStatusEvent); + assertNotNull(cartStatusEvent, "cart status event was not sent"); CartStatusEventDetails cartStatusEventDetails = cloudEventDataAccess.toCartStatusEventDetails(cartStatusEvent); assertEquals(1, cartStatusEventDetails.getCartItemStatuses().size()); CartItemStatusDetails itemStatus = cartStatusEventDetails.getCartItemStatuses().get(0); diff --git a/src/test/java/com/example/stateful_functions/integration/StatefulFunctionIntegrationTest.java b/src/test/java/com/example/stateful_functions/integration/StatefulFunctionIntegrationTest.java index 20e9874..d2185d5 100644 --- a/src/test/java/com/example/stateful_functions/integration/StatefulFunctionIntegrationTest.java +++ b/src/test/java/com/example/stateful_functions/integration/StatefulFunctionIntegrationTest.java @@ -5,16 +5,19 @@ import com.example.stateful_functions.ingress.IngressSpecs; import com.example.stateful_functions.protobuf.ExampleProtobuf; import com.example.stateful_functions.util.TestMessageSource; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; import org.apache.flink.statefun.flink.harness.Harness; import org.apache.flink.statefun.flink.harness.io.SerializableConsumer; -import org.springframework.beans.factory.annotation.Autowired; + import java.util.ArrayList; import java.util.List; +@MicronautTest public abstract class StatefulFunctionIntegrationTest extends AbstractStatefulFunctionTest { - @Autowired + @Inject TestMessageSource testMessageSource; static final List egressEvents = new ArrayList<>(); @@ -27,7 +30,8 @@ protected List executeTestHarnessWith(String messageSo Harness harness = new Harness() .withFlinkSourceFunction(IngressSpecs.INGRESS_ID, testMessageSource) - .withConsumingEgress(EgressSpecs.ID, (SerializableConsumer) envelope -> egressEvents.add(envelope)); + .withConsumingEgress(EgressSpecs.ID, (SerializableConsumer) envelope -> egressEvents.add(envelope)) + .withConfiguration("pipeline.closure-cleaner-level","NONE"); harness.start(); diff --git a/src/test/java/com/example/stateful_functions/isolation/ProductStatefulFunctionTest.java b/src/test/java/com/example/stateful_functions/isolation/ProductStatefulFunctionTest.java index ff901f8..8fa0885 100644 --- a/src/test/java/com/example/stateful_functions/isolation/ProductStatefulFunctionTest.java +++ b/src/test/java/com/example/stateful_functions/isolation/ProductStatefulFunctionTest.java @@ -4,15 +4,15 @@ import com.example.stateful_functions.function.product.ProductStatefulFunction; import com.example.stateful_functions.protobuf.ExampleProtobuf; import com.example.stateful_functions.util.TestMessageLoader; +import jakarta.inject.Inject; import org.apache.flink.statefun.testutils.function.FunctionTestHarness; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import java.util.Iterator; -import static org.junit.Assert.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotNull; /* * Test of ProductDetailsStatefulFunction @@ -22,10 +22,10 @@ public class ProductStatefulFunctionTest extends StatefulFunctionIsolationTest { private static final Logger LOG = LoggerFactory.getLogger(ProductStatefulFunctionTest.class); - @Autowired + @Inject ProductStatefulFunction statefulFunction; - @Autowired + @Inject TestMessageLoader testMessageLoader; @Test diff --git a/src/test/java/com/example/stateful_functions/state_types/TypeTesterTest.java b/src/test/java/com/example/stateful_functions/state_types/TypeTesterTest.java index 1c43ddb..a0f5906 100644 --- a/src/test/java/com/example/stateful_functions/state_types/TypeTesterTest.java +++ b/src/test/java/com/example/stateful_functions/state_types/TypeTesterTest.java @@ -1,7 +1,9 @@ package com.example.stateful_functions.state_types; -import com.example.stateful_functions.AbstractStatefulFunctionTest; import com.example.stateful_functions.function.FunctionProvider; +import io.micronaut.context.ApplicationContext; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.GenericTypeInfo; @@ -12,34 +14,27 @@ import org.apache.flink.statefun.sdk.StatefulFunction; import org.apache.flink.statefun.sdk.state.PersistedTable; import org.apache.flink.statefun.sdk.state.PersistedValue; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationContext; - -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; + +import java.lang.reflect.*; import java.util.*; import java.util.stream.Collectors; -import static org.junit.Assert.*; -import static org.junit.Assert.assertTrue; - +import static org.junit.jupiter.api.Assertions.*; /** * Ensures that the stateful function state model classes meet the requirements for Kyro serialization */ -public class TypeTesterTest extends AbstractStatefulFunctionTest { +@MicronautTest +public class TypeTesterTest { private static final Logger LOG = LoggerFactory.getLogger(TypeTesterTest.class); - @Autowired - ApplicationContext applicationContext; + @Inject + public ApplicationContext applicationContext; @Test public void run() throws Exception { @@ -107,7 +102,7 @@ private static void check(Class modelClass, Set hits) modelObject = modelClass.getConstructor().newInstance(); } } catch (Exception ex) { - assertTrue(modelClass.getSimpleName() + " needs parameterless constructor", false); + fail(modelClass.getSimpleName() + " needs parameterless constructor"); } TypeInformation flinkTypeInfo = TypeInformation.of(modelClass); @@ -117,7 +112,7 @@ private static void check(Class modelClass, Set hits) if (modelObject instanceof Collection) return; - assertTrue(modelClass.getSimpleName() + " must be A POJO, found " + flinkTypeInfo, flinkTypeInfo instanceof PojoTypeInfo); + assertTrue(flinkTypeInfo instanceof PojoTypeInfo, modelClass.getSimpleName() + " must be A POJO, found " + flinkTypeInfo); PojoTypeInfo pojoFlinkTypeInfo = (PojoTypeInfo)flinkTypeInfo; @@ -144,11 +139,11 @@ private static void check(Class modelClass, Set hits) toCheck.add(flinkFieldTypeInfo); // Is the field in our class compatible with flink's type info entry for our field - assertTrue(c.getSimpleName() + "." + classFieldName + " is not compatible with " + flinkFieldTypeInfo, isCompatible(modelObject, modelClass, modelClassField, modelFieldClass, flinkFieldTypeInfo, toCheck)); - assertTrue(c.getSimpleName() + "." + classFieldName + " needs correctly typed getters and setters.", hasGettersSetters(modelClass, modelFieldClass, modelClassField)); + assertTrue(isCompatible(modelObject, modelClass, modelClassField, modelFieldClass, flinkFieldTypeInfo, toCheck), c.getSimpleName() + "." + classFieldName + " is not compatible with " + flinkFieldTypeInfo); + assertTrue(hasGettersSetters(modelClass, modelFieldClass, modelClassField), c.getSimpleName() + "." + classFieldName + " needs correctly typed getters and setters."); if (!flinkFieldTypeInfo.isBasicType()) { - assertFalse(c.getSimpleName() + "." + classFieldName + " must not be generic, found " + flinkFieldTypeInfo, flinkFieldTypeInfo instanceof GenericTypeInfo); + assertFalse(flinkFieldTypeInfo instanceof GenericTypeInfo, c.getSimpleName() + "." + classFieldName + " must not be generic, found " + flinkFieldTypeInfo); if (flinkFieldTypeInfo instanceof MapTypeInfo) { MapTypeInfo mapTypeInfo = (MapTypeInfo) flinkFieldTypeInfo; diff --git a/src/test/java/com/example/stateful_functions/util/TestMessageLoader.java b/src/test/java/com/example/stateful_functions/util/TestMessageLoader.java index 441a908..f977075 100644 --- a/src/test/java/com/example/stateful_functions/util/TestMessageLoader.java +++ b/src/test/java/com/example/stateful_functions/util/TestMessageLoader.java @@ -2,9 +2,10 @@ import com.example.stateful_functions.protobuf.ExampleProtobuf; import com.google.common.io.Resources; +import jakarta.inject.Singleton; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; + import java.io.BufferedReader; import java.io.Serializable; @@ -13,7 +14,7 @@ import java.util.ArrayList; import java.util.List; -@Component +@Singleton public class TestMessageLoader implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(TestMessageLoader.class); diff --git a/src/test/java/com/example/stateful_functions/util/TestMessageSource.java b/src/test/java/com/example/stateful_functions/util/TestMessageSource.java index 212a2d7..3ee7cce 100644 --- a/src/test/java/com/example/stateful_functions/util/TestMessageSource.java +++ b/src/test/java/com/example/stateful_functions/util/TestMessageSource.java @@ -3,24 +3,26 @@ import com.example.stateful_functions.cloudevents.ExampleCloudEventJsonFormat; import com.example.stateful_functions.protobuf.ExampleProtobuf; import io.cloudevents.CloudEvent; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; import org.apache.commons.lang.StringUtils; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; + import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -@Component +@Singleton public class TestMessageSource implements SourceFunction, Serializable { private static final Logger LOG = LoggerFactory.getLogger(TestMessageSource.class); - @Autowired TestMessageLoader testMessageLoader; + @Inject + TestMessageLoader testMessageLoader; final List envelopes = new ArrayList<>();