Skip to content
32 changes: 19 additions & 13 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -209,21 +209,27 @@ jobs:
- "native"
- "standalone"
test:
- test_application_kubernetes_ha.sh
- test_application_operations.sh
- test_dynamic_config.sh
- test_dynamic_flink_conf.sh
- test_sessionjob_kubernetes_ha.sh
- test_sessionjob_operations.sh
- test_autoscaler.sh
- test_flink_operator_ha.sh
- test_snapshot.sh
- test_batch_job.sh
# - test_application_kubernetes_ha.sh
# - test_application_operations.sh
# - test_dynamic_config.sh
# - test_dynamic_flink_conf.sh
# - test_sessionjob_kubernetes_ha.sh
# - test_sessionjob_operations.sh
# - test_autoscaler.sh
# - test_flink_operator_ha.sh
# - test_snapshot.sh
# - test_batch_job.sh
- test_bluegreen_laststate.sh
- test_bluegreen_stateless.sh
exclude:
- mode: standalone
test: test_autoscaler.sh
# - mode: standalone
# test: test_autoscaler.sh
# - flink-version: v1_19
# test: test_snapshot.sh
- flink-version: v1_19
test: test_bluegreen_laststate.sh
- flink-version: v1_19
test: test_snapshot.sh
test: test_bluegreen_stateless.sh
uses: ./.github/workflows/e2e.yaml
with:
java-version: 17
Expand Down
20 changes: 13 additions & 7 deletions e2e-tests/data/bluegreen-laststate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ spec:
image: flink:1.20
flinkVersion: v1_20
flinkConfiguration:
rest.port: "8081"
# rest.port: "8081"
jobmanager.memory.jvm-metaspace.size: 96m
jobmanager.memory.jvm-overhead.min: 32m
taskmanager.memory.managed.size: 78m
taskmanager.memory.jvm-metaspace.size: 64m
taskmanager.memory.jvm-overhead.min: 32m
taskmanager.memory.network.min: 32m
execution.checkpointing.interval: "10s"
execution.checkpointing.storage: "filesystem"
state.backend.incremental: "true"
Expand All @@ -39,17 +45,17 @@ spec:
serviceAccount: flink
jobManager:
resource:
memory: 1G
cpu: 1
memory: 512m
cpu: 0.5
podTemplate:
spec:
containers:
- name: flink-main-container
resources:
requests:
ephemeral-storage: 2048Mi
ephemeral-storage: 1024Mi
limits:
ephemeral-storage: 2048Mi
ephemeral-storage: 1024Mi
volumeMounts:
- mountPath: /opt/flink/volume
name: flink-volume
Expand All @@ -59,8 +65,8 @@ spec:
claimName: flink-bg-laststate
taskManager:
resource:
memory: 2G
cpu: 1
memory: 512m
cpu: 0.5
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 1
Expand Down
17 changes: 12 additions & 5 deletions e2e-tests/data/bluegreen-stateless.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,24 @@ spec:
image: flink:1.20
flinkVersion: v1_20
flinkConfiguration:
rest.port: "8081"
# rest.port: "8081"
# taskmanager.memory.process.size: 768m
jobmanager.memory.jvm-metaspace.size: 96m
jobmanager.memory.jvm-overhead.min: 32m
taskmanager.memory.managed.size: 78m
taskmanager.memory.jvm-metaspace.size: 64m
taskmanager.memory.jvm-overhead.min: 32m
taskmanager.memory.network.min: 32m
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: 1G
cpu: 1
memory: 512m
cpu: 0.5
taskManager:
resource:
memory: 2G
cpu: 1
memory: 512m
cpu: 0.5
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 1
Expand Down
36 changes: 34 additions & 2 deletions e2e-tests/test_bluegreen_laststate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,47 @@ wait_for_status $APPLICATION_IDENTIFIER '.status.blueGreenState' ACTIVE_BLUE ${T

#blue_job_id=$(kubectl get -oyaml flinkdep/basic-bluegreen-example-blue | yq '.status.jobStatus.jobId')

#kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch '{"spec":{"template":{"spec":{"flinkConfiguration":{"rest.port":"8082","state.checkpoints.num-retained":"6"}}}}}'
kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch '{"spec":{"template":{"spec":{"flinkConfiguration":{"state.checkpoints.num-retained":"6"}}}}}'
kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch '{"spec":{"template":{"spec":{"flinkConfiguration":{"rest.port":"8082","state.checkpoints.num-retained":"6"}}}}}'
#kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch '{"spec":{"template":{"spec":{"flinkConfiguration":{"state.checkpoints.num-retained":"6"}}}}}'
echo "Resource patched, giving a chance for the savepoint to be taken..."
sleep 10

jm_pod_name=$(get_jm_pod_name $BLUE_CLUSTER_ID)
echo "Inspecting savepoint directory..."
kubectl exec -it $jm_pod_name -- bash -c "ls -lt /opt/flink/volume/flink-sp/"


tm_pod_name=""
for i in $(seq 1 6); do
echo "====="
echo "LISTING PODS:"
kubectl get pods

if [ "$jm_pod_name" = "" ]; then
jm_pod_name=$(kubectl get pods --selector="app=${GREEN_CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}')
echo "Set JM pod name:" $jm_pod_name
fi
echo "GETTING JM LOGS:"
kubectl logs $jm_pod_name -c flink-main-container

echo "--==--"
if [ "$tm_pod_name" = "" ]; then
tm_pod_name=$(kubectl get pods --selector="app=${GREEN_CLUSTER_ID},component=taskmanager" -o jsonpath='{..metadata.name}')
echo "Set TM pod name:" $tm_pod_name
fi
echo "GETTING TM LOGS:"
kubectl logs $tm_pod_name -c flink-main-container

echo "--=EVs=--"
kubectl describe flinkdep $GREEN_CLUSTER_ID
# kubectl get events --field-selector involvedObject.kind=FlinkDeployment,involvedObject.name=$GREEN_CLUSTER_ID --sort-by=.metadata.creationTimestamp
sleep 30

status=$(kubectl get -oyaml $GREEN_APPLICATION_IDENTIFIER | yq '.status.lifecycleState')
echo "==> lifecycleState after 30 secs: " $status
done


wait_for_status $GREEN_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE ${TIMEOUT} || exit 1
kubectl wait --for=delete deployment --timeout=${TIMEOUT}s --selector="app=${BLUE_CLUSTER_ID}"
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
Expand Down
36 changes: 34 additions & 2 deletions e2e-tests/test_bluegreen_stateless.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,40 @@ wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIME
wait_for_status $APPLICATION_IDENTIFIER '.status.blueGreenState' ACTIVE_BLUE ${TIMEOUT} || exit 1

echo "PATCHING B/G deployment..."
#kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch '{"spec":{"template":{"spec":{"flinkConfiguration":{"rest.port":"8082","taskmanager.numberOfTaskSlots":"2"}}}}}'
kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch '{"spec":{"template":{"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"}}}}}'
kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch '{"spec":{"template":{"spec":{"flinkConfiguration":{"rest.port":"8082","taskmanager.numberOfTaskSlots":"2"}}}}}'
#kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch '{"spec":{"template":{"spec":{"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"}}}}}'

jm_pod_name=""
tm_pod_name=""
for i in $(seq 1 6); do
echo "====="
echo "LISTING PODS:"
kubectl get pods

if [ "$jm_pod_name" = "" ]; then
jm_pod_name=$(kubectl get pods --selector="app=${GREEN_CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}')
echo "Set JM pod name:" $jm_pod_name
fi
echo "GETTING JM LOGS:"
kubectl logs $jm_pod_name -c flink-main-container

echo "--==--"
if [ "$tm_pod_name" = "" ]; then
tm_pod_name=$(kubectl get pods --selector="app=${GREEN_CLUSTER_ID},component=taskmanager" -o jsonpath='{..metadata.name}')
echo "Set TM pod name:" $tm_pod_name
fi
echo "GETTING TM LOGS:"
kubectl logs $tm_pod_name

echo "--=EVs=--"
kubectl describe flinkdep $GREEN_CLUSTER_ID
# kubectl get events --field-selector involvedObject.kind=FlinkDeployment,involvedObject.name=$GREEN_CLUSTER_ID --sort-by=.metadata.creationTimestamp
sleep 30

status=$(kubectl get -oyaml $GREEN_APPLICATION_IDENTIFIER | yq '.status.lifecycleState')
echo "==> lifecycleState after 30 secs: " $status
done


wait_for_status $GREEN_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE ${TIMEOUT} || exit 1
kubectl wait --for=delete deployment --timeout=${TIMEOUT}s --selector="app=${BLUE_CLUSTER_ID}"
Expand Down
Loading