Skip to content

Commit cdd81c7

Browse files
authored
[FLINK-35266][snapshot] Add E2E tests for FlinkStateSnapshot
1 parent 6a426b2 commit cdd81c7

File tree

7 files changed

+252
-0
lines changed

7 files changed

+252
-0
lines changed

.github/workflows/ci.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ jobs:
8787
- test_multi_sessionjob.sh
8888
- test_autoscaler.sh
8989
- test_flink_operator_ha.sh
90+
- test_snapshot.sh
9091
include:
9192
- namespace: flink
9293
extraArgs: '--create-namespace --set "watchNamespaces={default,flink}"'
@@ -111,20 +112,30 @@ jobs:
111112
test: test_autoscaler.sh
112113
- mode: standalone
113114
test: test_dynamic_config.sh
115+
- mode: standalone
116+
test: test_snapshot.sh
114117
- version: v1_16
115118
test: test_autoscaler.sh
116119
- version: v1_16
117120
test: test_dynamic_config.sh
118121
- version: v1_16
119122
test: test_flink_operator_ha.sh
123+
- version: v1_16
124+
test: test_snapshot.sh
120125
- version: v1_17
121126
test: test_dynamic_config.sh
122127
- version: v1_17
123128
test: test_flink_operator_ha.sh
129+
- version: v1_17
130+
test: test_snapshot.sh
124131
- version: v1_18
125132
test: test_dynamic_config.sh
126133
- version: v1_18
127134
test: test_flink_operator_ha.sh
135+
- version: v1_18
136+
test: test_snapshot.sh
137+
- version: v1_19
138+
test: test_snapshot.sh
128139
- version: v1_16
129140
java-version: 17
130141
- version: v1_17

e2e-tests/data/savepoint.yaml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
apiVersion: flink.apache.org/v1beta1
20+
kind: FlinkStateSnapshot
21+
metadata:
22+
name: example-savepoint
23+
spec:
24+
backoffLimit: 0
25+
jobReference:
26+
kind: FlinkDeployment
27+
name: flink-example-statemachine
28+
savepoint:
29+
alreadyExists: false
30+
disposeOnDelete: true
31+
formatType: CANONICAL
32+

e2e-tests/test_autoscaler.sh

100644100755
File mode changed.

e2e-tests/test_dynamic_config.sh

100644100755
File mode changed.

e2e-tests/test_flink_operator_ha.sh

100644100755
File mode changed.

e2e-tests/test_snapshot.sh

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
#!/usr/bin/env bash
2+
################################################################################
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
################################################################################
19+
20+
# This script tests the FlinkStateSnapshot CR as follows:
21+
# 1. Create deployment with FlinkStateSnapshot disabled and upgrade it. Then enable FlinkStateSnapshot and assert that the saved savepoint was used.
22+
# 2. Trigger and dispose of savepoint by creating a new FlinkStateSnapshot savepoint CR
23+
# 3. Trigger savepoint by using savepoint trigger nonce
24+
# 4. Trigger checkpoint by using trigger nonce
25+
# 5. Test periodic savepoints triggered by the operator
26+
# 6. Change job to upgrade mode, suspend job and assert new FlinkStateSnapshot CR created
27+
28+
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
29+
source "${SCRIPT_DIR}/utils.sh"
30+
31+
CLUSTER_ID="flink-example-statemachine"
32+
APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-cr.yaml"
33+
APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID"
34+
35+
SAVEPOINT_YAML="${SCRIPT_DIR}/data/savepoint.yaml"
36+
SAVEPOINT_IDENTIFIER="flinksnp/example-savepoint"
37+
38+
TIMEOUT=300
39+
40+
on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID
41+
on_exit cleanup_snapshots "$CLUSTER_ID" $TIMEOUT
42+
43+
retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
44+
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job":{"upgradeMode": "savepoint"},"flinkConfiguration":{"web.checkpoints.history":"1000"}}}'
45+
46+
wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
47+
jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
48+
49+
wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
50+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
51+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
52+
53+
54+
55+
# Test upgrade by setting legacy field
56+
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job":{"state": "suspended"}}}'
57+
wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED" ${TIMEOUT} || exit 1
58+
59+
location=$(kubectl get $APPLICATION_IDENTIFIER -o yaml | yq '.status.jobStatus.upgradeSnapshotReference.path')
60+
if [ "$location" == "" ]; then echo "Legacy savepoint location was empty"; exit 1; fi
61+
echo "Removing upgradeSnapshotReference and setting lastSavepoint"
62+
kubectl patch flinkdep ${CLUSTER_ID} --type=merge --subresource status --patch '{"status":{"jobStatus":{"upgradeSnapshotReference":null,"savepointInfo":{"lastSavepoint":{"timeStamp": 0, "location": "'$location'", "triggerNonce": 0}}}}}'
63+
64+
# Delete operator Pod to clear CR state cache
65+
kubectl delete pod -n $(get_operator_pod_namespace) $(get_operator_pod_name)
66+
sleep 20
67+
retry_times 10 10 "kubectl wait -n $(get_operator_pod_namespace) --for=condition=Ready --timeout=${TIMEOUT}s pod/$(get_operator_pod_name)" || exit 1
68+
69+
echo "Restarting deployment and asserting savepoint path used"
70+
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"state": "running" } } }'
71+
wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
72+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
73+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
74+
75+
jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
76+
wait_for_logs $jm_pod_name "Restoring job .* from Savepoint" ${TIMEOUT} || exit 1
77+
wait_for_logs $jm_pod_name "execution.savepoint.path, ${location}" ${TIMEOUT} || exit 1
78+
wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
79+
80+
81+
82+
# Enable FlinkStateSnapshot CRs
83+
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"flinkConfiguration":{"kubernetes.operator.snapshot.resource.enabled":"true"}}}'
84+
job_id=$(kubectl logs $jm_pod_name -c flink-main-container | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}')
85+
echo "Found job ID $job_id"
86+
87+
88+
89+
# Testing manual savepoint trigger and disposal via CR
90+
echo "Creating manual savepoint..."
91+
retry_times 5 30 "kubectl apply -f $SAVEPOINT_YAML" || exit 1
92+
wait_for_status $SAVEPOINT_IDENTIFIER '.status.state' "COMPLETED" $TIMEOUT || exit 1
93+
94+
location=$(kubectl get $SAVEPOINT_IDENTIFIER -o yaml | yq '.status.path')
95+
if [ "$location" == "" ]; then echo "Manual savepoint location was empty"; exit 1; fi
96+
97+
echo "Disposing manual savepoint..."
98+
kubectl delete $SAVEPOINT_IDENTIFIER
99+
wait_for_logs $jm_pod_name "Disposing savepoint $location" ${TIMEOUT} || exit 1
100+
101+
102+
103+
# Testing manual savepoint via trigger nonce
104+
kubectl patch $APPLICATION_IDENTIFIER --type merge --patch '{"spec":{"job": {"savepointTriggerNonce": 123456 } } }'
105+
106+
echo "Waiting for manual savepoint..."
107+
snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "manual" ${TIMEOUT})
108+
if [ "$snapshot" == "" ]; then echo "Could not find snapshot"; exit 1; fi
109+
echo "Found snapshot with name $snapshot"
110+
111+
wait_for_status flinksnp/$snapshot '.status.spec.checkpoint' null $TIMEOUT || exit 1
112+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.savepointInfo.triggerId' null $TIMEOUT || exit 1
113+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.savepointInfo.triggerTimestamp' null $TIMEOUT || exit 1
114+
if [ "$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')" == "" ]; then echo "Manual savepoint location was empty"; exit 1; fi
115+
kubectl delete flinksnp/$snapshot
116+
117+
118+
119+
# Testing manual checkpoint via trigger nonce
120+
kubectl patch $APPLICATION_IDENTIFIER --type merge --patch '{"spec":{"job": {"checkpointTriggerNonce": 123456 } } }'
121+
122+
echo "Waiting for manual checkpoint..."
123+
snapshot=$(wait_for_snapshot $CLUSTER_ID "checkpoint" "manual" ${TIMEOUT})
124+
if [ "$snapshot" == "" ]; then echo "Could not find snapshot"; exit 1; fi
125+
126+
echo "Found checkpoint with name $snapshot"
127+
128+
wait_for_status flinksnp/$snapshot '.status.spec.savepoint' null $TIMEOUT || exit 1
129+
if [ "$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')" == "" ]; then echo "Manual checkpoint location was empty"; exit 1; fi
130+
kubectl delete flinksnp/$snapshot
131+
132+
133+
# Test periodic savepoints
134+
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"flinkConfiguration":{"kubernetes.operator.periodic.savepoint.interval":"60s"}}}'
135+
sleep 20
136+
137+
echo "Waiting for periodic savepoint..."
138+
snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "periodic" ${TIMEOUT})
139+
if [ "$snapshot" == "" ]; then echo "Could not find snapshot"; exit 1; fi
140+
141+
echo "Found periodic savepoint: $snapshot"
142+
if [ "$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')" == "" ]; then echo "Periodic savepoint location was empty"; exit 1; fi
143+
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"flinkConfiguration":{"kubernetes.operator.periodic.savepoint.interval":""}}}'
144+
145+
146+
# Test upgrade savepoint
147+
echo "Suspending deployment..."
148+
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job":{"state":"suspended"}}}'
149+
wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED" ${TIMEOUT} || exit 1
150+
151+
echo "Waiting for upgrade savepoint..."
152+
snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "upgrade" ${TIMEOUT})
153+
if [ "$snapshot" == "" ]; then echo "Could not find snapshot"; exit 1; fi
154+
echo "Found upgrade snapshot: $snapshot"
155+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.upgradeSnapshotReference.name' "$snapshot" ${TIMEOUT} || exit 1
156+
157+
location=$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')
158+
if [ "$location" == "" ]; then echo "Upgrade savepoint location was empty"; exit 1; fi
159+
160+
161+
162+
echo "Restarting deployment..."
163+
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"state": "running" } } }'
164+
wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
165+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
166+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
167+
168+
jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
169+
170+
# Check the new JobManager recovering from latest successful checkpoint
171+
wait_for_logs $jm_pod_name "Restoring job .* from Savepoint" ${TIMEOUT} || exit 1
172+
wait_for_logs $jm_pod_name "execution.savepoint.path, ${location}" ${TIMEOUT} || exit 1
173+
wait_for_logs $jm_pod_name "Completed checkpoint [0-9]+ for job" ${TIMEOUT} || exit 1
174+
175+
kubectl delete flinksnp/$snapshot
176+
177+
echo "Successfully run the FlinkStateSnapshot test"
178+

e2e-tests/utils.sh

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,25 @@ function wait_for_event {
9797
exit 1
9898
}
9999

100+
function wait_for_snapshot {
101+
local job_name=$1
102+
local snapshot_type=$2
103+
local trigger_type=$3
104+
local timeout=$4
105+
local prefix="$job_name-$snapshot_type-$trigger_type"
106+
107+
for i in $(seq 1 ${timeout}); do
108+
snapshot_name=$(kubectl get flinksnp --sort-by=.metadata.creationTimestamp | grep $prefix | awk '{print $1}' | tail -n 1)
109+
if [ "$snapshot_name" ]; then
110+
kubectl wait --timeout=${timeout}s --for=jsonpath='{.status.state}'=COMPLETED flinksnp/$snapshot_name > /dev/null || return 1
111+
echo "$snapshot_name"
112+
return 0
113+
fi
114+
sleep 1
115+
done
116+
return 1
117+
}
118+
100119
function assert_available_slots() {
101120
expected=$1
102121
CLUSTER_ID=$2
@@ -331,6 +350,18 @@ function operator_cleanup_and_exit() {
331350
fi
332351
}
333352

353+
function cleanup_snapshots() {
354+
echo "Starting cleanup of FlinkStateSnapshot resources"
355+
356+
CLUSTER_ID=$1
357+
TIMEOUT=$2
358+
359+
kubectl get flinksnp | grep "^${CLUSTER_ID}" | awk '{print $1}' | xargs -n 1 -P 5 kubectl patch flinksnp -p '{"metadata":{"finalizers":null}}' --type=merge
360+
kubectl get flinksnp | grep "^${CLUSTER_ID}" | awk '{print $1}' | xargs -n 1 -P 5 kubectl delete --timeout=${TIMEOUT}s flinksnp
361+
362+
echo "Finished cleaning up FlinkStateSnapshot resources"
363+
}
364+
334365
function _on_exit_callback {
335366
# Export the exit code so that it could be used by the callback commands
336367
export TRAPPED_EXIT_CODE=$?

0 commit comments

Comments
 (0)