Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# The parent Flink image (flink:1.18.1-java11) only contains the JRE (openjdk:11-jre), and it is missing key
# diagnostic tools. This multistage build will overwrite the JRE with the JDK from openjdk:11
# The parent Flink image (flink:1.18.1-java17) only contains the JRE (openjdk:17-jre), and it is missing key
# diagnostic tools. This multistage build will overwrite the JRE with the JDK from openjdk:17
# See https://docs.docker.com/develop/develop-images/multistage-build/
FROM --platform=linux/amd64 openjdk:11 AS jdk_image
FROM --platform=linux/amd64 flink:1.18.1-java11
# Add --platform=linux/amd64 to the FROM commands below when on Apple silicon
#FROM --platform=linux/amd64 openjdk:17 as jdk_image
FROM flink:1.18.1-java17

# Copy the JDK from the jdk_image
COPY --from=jdk_image /usr/local/openjdk-11 /opt/java/openjdk/
#COPY --from=jdk_image /usr/java/openjdk-17 /opt/java/openjdk

RUN sed -i -e 's/^.*networkaddress.cache.ttl=.*$/networkaddress.cache.ttl=30/g' /opt/java/openjdk/conf/security/java.security
RUN sed -i -e 's/^.*networkaddress.cache.negative.ttl=.*$/networkaddress.cache.negative.ttl=10/g' /opt/java/openjdk/conf/security/java.security
Expand Down
49 changes: 25 additions & 24 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
services:
localstack:
image: localstack/localstack:3.0.2
platform: linux/amd64
image: localstack/localstack:3.1.0
container_name: demo.localhost.localstack.cloud
profiles: [kinesis,statefun,all]
ports:
- "4566:4566"
environment:
- SERVICES=kinesis
- HOSTNAME=localstack
- HOSTNAME=demo.localhost.localstack.cloud
- AWS_REGION=us-east-1
- AWS_ACCESS_KEY_ID=example-access-key-id
- AWS_SECRET_ACCESS_KEY=example-secret-access-key
- AWS_ENDPOINT=https://localstack:4566
- AWS_ENDPOINT_URL=https://localstack:4566
- AWS_ENDPOINT=https://demo.localhost.localstack.cloud:4566
- AWS_ENDPOINT_URL=https://demo.localhost.localstack.cloud:4566
- DEBUG=0
- KINESIS_ERROR_PROBABILITY=0.0
- DOCKER_HOST=unix:///var/run/docker.sock

create-streams:
image: amazon/aws-cli
platform: linux/amd64
profiles: [kinesis,statefun,all]
depends_on:
- localstack
Expand All @@ -35,15 +34,15 @@ services:
"
set -x
echo Creating ingress stream
until aws --endpoint-url=http://localstack:4566 kinesis create-stream --stream-name example-ingress-stream --shard-count 1; do sleep 1; done
until aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis create-stream --stream-name example-ingress-stream --shard-count 1; do sleep 1; done
echo Creating egress stream
aws --endpoint-url=http://localstack:4566 kinesis create-stream --stream-name example-egress-stream --shard-count 1
aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis create-stream --stream-name example-egress-stream --shard-count 1
echo Listing streams
aws --endpoint-url=http://localstack:4566 kinesis list-streams
aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis list-streams
"

jobmanager:
platform: linux/amd64
container_name: demo.flink-jobmanager
profiles: [statefun,all]
depends_on:
- create-streams
Expand All @@ -52,8 +51,8 @@ services:
# to restore from savepoint, add this to the command: --fromSavepoint file:///savepoints/savepoint-xxxx-yyyyyy
command: >
standalone-job
-D "rest.address=jobmanager"
-D "jobmanager.rpc.address=jobmanager"
-D "rest.address=demo.flink-jobmanager"
-D "jobmanager.rpc.address=demo.flink-jobmanager"
-D "state.savepoints.dir=file:///savepoints"
-D "state.checkpoints.dir=file:///checkpoints"
--job-classname org.apache.flink.statefun.flink.core.StatefulFunctionsJob
Expand All @@ -62,30 +61,32 @@ services:
expose:
- "6123"
ports:
- "5065:5065"
- "8081:8081"
environment:
- IS_LOCAL_DEV=true
- AWS_REGION=us-east-1
- AWS_ACCESS_KEY_ID=example-access-key-id
- AWS_SECRET_ACCESS_KEY=example-secret-access-key
- AWS_ENDPOINT=https://host.docker.internal:4566
- AWS_ENDPOINT=https://demo.localhost.localstack.cloud:4566
- AWS_CBOR_DISABLE=true
- USE_ENHANCED_FANOUT=false # Disable enhanced fanout for local development, there seems to be a bug in localstack
- FLINK_ENV_JAVA_OPTS=-Dcom.amazonaws.sdk.disableCertChecking
- FLINK_ENV_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5065
# - FLINK_ENV_JAVA_OPTS=-Dcom.amazonaws.sdk.disableCertChecking
volumes:
- ./docker-mounts/checkpoints:/checkpoints
- ./docker-mounts/savepoints:/savepoints

taskmanager:
platform: linux/amd64
container_name: demo.flink-taskmanager
profiles: [statefun,all]
depends_on:
- jobmanager
build:
context: .
command: >
taskmanager
-D "jobmanager.rpc.address=jobmanager"
-D "jobmanager.rpc.address=demo.flink-jobmanager"
-D "state.savepoints.dir=file:///savepoints"
-D "state.checkpoints.dir=file:///checkpoints"
entrypoint: /entrypoint.sh
Expand All @@ -99,17 +100,17 @@ services:
- AWS_REGION=us-east-1
- AWS_ACCESS_KEY_ID=example-access-key-id
- AWS_SECRET_ACCESS_KEY=example-secret-access-key
- AWS_ENDPOINT=https://host.docker.internal:4566
- AWS_ENDPOINT=https://demo.localhost.localstack.cloud:4566
- AWS_CBOR_DISABLE=true
- USE_ENHANCED_FANOUT=false # Disable enhanced fanout for local development, there seems to be a bug in localstack
- FLINK_ENV_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5066 -Dcom.amazonaws.sdk.disableCertChecking
- FLINK_ENV_JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5066 #-Dcom.amazonaws.sdk.disableCertChecking
volumes:
- ./docker-mounts/checkpoints:/checkpoints
- ./docker-mounts/savepoints:/savepoints

send-events:
image: amazon/aws-cli
platform: linux/amd64
container_name: demo.send-events-to-ingress
profiles: [send-events,all]
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Expand All @@ -124,15 +125,15 @@ services:
grep -v test.action /test-resources/product-cart-integration-test-events.jsonl | while read line; do
partkey=$$(echo $$line | md5sum | awk '{print $$1}')
data=$$(echo $$line | base64 -w 0)
cmd=\"aws --endpoint-url=http://localstack:4566 kinesis put-record --stream-name example-ingress-stream --partition-key $$partkey --data $$data\"
cmd=\"aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis put-record --stream-name example-ingress-stream --partition-key $$partkey --data $$data\"
echo $$cmd
eval $$cmd
done
"

get-egress-events:
image: amazon/aws-cli
platform: linux/amd64
container_name: demo.get-events-from-egress
profiles: [get-egress-events,all]
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Expand All @@ -145,9 +146,9 @@ services:
command: >
"
yum install --quiet -y jq
shard_id=$$(aws --endpoint-url=http://localstack:4566 kinesis list-shards --stream-name example-egress-stream | jq -crM .Shards[0].ShardId)
shard_iterator=$$(aws --endpoint-url=http://localstack:4566 kinesis get-shard-iterator --shard-id $$shard_id --shard-iterator-type TRIM_HORIZON --stream-name example-egress-stream | jq -crM .ShardIterator)
for encoded_data in $$(aws --endpoint-url=http://localstack:4566 kinesis get-records --shard-iterator $$shard_iterator | jq -crM .Records[].Data); do
shard_id=$$(aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis list-shards --stream-name example-egress-stream | jq -crM .Shards[0].ShardId)
shard_iterator=$$(aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis get-shard-iterator --shard-id $$shard_id --shard-iterator-type TRIM_HORIZON --stream-name example-egress-stream | jq -crM .ShardIterator)
for encoded_data in $$(aws --endpoint-url=http://demo.localhost.localstack.cloud:4566 kinesis get-records --shard-iterator $$shard_iterator | jq -crM .Records[].Data); do
echo $$encoded_data | base64 -d | jq .
done
"
128 changes: 108 additions & 20 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.micronaut.platform</groupId>
<artifactId>micronaut-parent</artifactId>
<version>4.7.6</version>
</parent>

<groupId>com.example.statefun-java</groupId>
<artifactId>my-stateful-functions-embedded-java</artifactId>
<version>3.3.0</version>
Expand All @@ -19,12 +25,20 @@
<statefun.version>3.3.0.1-1.18</statefun.version>
<flink.version>1.18.1</flink.version>
<protobuf.version>3.25.6</protobuf.version>
<java.version>11</java.version>
<java.version>17</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<maven.deploy.skip>true</maven.deploy.skip>
<aws.java.sdk.version>2.20.162</aws.java.sdk.version>
<kda.runtime.version>1.2.0</kda.runtime.version>

<jdk.version>17</jdk.version>
<release.version>17</release.version>
<micronaut.version>4.7.6</micronaut.version>
<micronaut.runtime>netty</micronaut.runtime>
<micronaut.aot.enabled>false</micronaut.aot.enabled>
<micronaut.aot.packageName>com.example.aot.generated</micronaut.aot.packageName>
<exec.mainClass>com.example.Application</exec.mainClass>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -52,18 +66,58 @@
<artifactId>apache-client</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
</dependency>

<!-- Springboot -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.19</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.4.2</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>opensearch</artifactId>
</dependency>

<!-- AWS Security Token Serivce - required to connect to AWS services (Elasticsearch,OpenSearch,etc) using
WebIdentityCredentials from the credential providers chain -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
</dependency>

<!-- Micronaut -->
<dependency>
<groupId>io.micronaut.serde</groupId>
<artifactId>micronaut-serde-jackson</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-http-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micronaut.test</groupId>
<artifactId>micronaut-test-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>


Expand Down Expand Up @@ -127,7 +181,6 @@
<version>${protobuf.version}</version>
</dependency>


<!-- Add IO dependencies here -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -186,17 +239,9 @@
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<version>2.4.2</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-state-processor</artifactId>
<version>${statefun.version}</version>
</dependency>

<dependency>
Expand All @@ -211,6 +256,16 @@
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -248,12 +303,45 @@
</extension>
</extensions>
<plugins>
<plugin>
<groupId>io.micronaut.maven</groupId>
<artifactId>micronaut-maven-plugin</artifactId>
<configuration>
<configFile>aot-jar.properties</configFile>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
<annotationProcessorPaths combine.children="append">
<path>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-http-validation</artifactId>
<version>${micronaut.core.version}</version>
</path>
<path>
<groupId>io.micronaut.serde</groupId>
<artifactId>micronaut-serde-processor</artifactId>
<version>${micronaut.serialization.version}</version>
<exclusions>
<exclusion>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-inject</artifactId>
</exclusion>
</exclusions>
</path>
</annotationProcessorPaths>
<compilerArgs>
<arg>-Amicronaut.processing.group=com.example</arg>
<arg>-Amicronaut.processing.module=my-stateful-functions-embedded-java</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.example.stateful_functions;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.micronaut.context.annotation.Factory;
import jakarta.inject.Singleton;

@Factory
public class ObjectMapperFactory {

@Singleton
public ObjectMapper objectMapper() {
return new ObjectMapper();
}
}
Loading