Skip to content

Commit c4d460b

Browse files
[FLINK-37370] [Observer] Fix exception caught when handling checkpointing not enabled for batch jobs and add batch e2e test
1 parent 48359a3 commit c4d460b

File tree

6 files changed

+164
-3
lines changed

6 files changed

+164
-3
lines changed

.github/workflows/ci.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ jobs:
161161
- test_autoscaler.sh
162162
- test_flink_operator_ha.sh
163163
- test_snapshot.sh
164+
- test_batch_job.sh
164165
exclude:
165166
- flink-version: v1_16
166167
test: test_autoscaler.sh
@@ -172,18 +173,24 @@ jobs:
172173
test: test_flink_operator_ha.sh
173174
- flink-version: v1_16
174175
test: test_snapshot.sh
176+
- flink-version: v1_16
177+
test: test_batch_job.sh
175178
- flink-version: v1_17
176179
test: test_dynamic_config.sh
177180
- flink-version: v1_17
178181
test: test_flink_operator_ha.sh
179182
- flink-version: v1_17
180183
test: test_snapshot.sh
184+
- flink-version: v1_17
185+
test: test_batch_job.sh
181186
- flink-version: v1_18
182187
test: test_dynamic_config.sh
183188
- flink-version: v1_18
184189
test: test_flink_operator_ha.sh
185190
- flink-version: v1_18
186191
test: test_snapshot.sh
192+
- flink-version: v1_18
193+
test: test_batch_job.sh
187194
- flink-version: v1_19
188195
test: test_snapshot.sh
189196
uses: ./.github/workflows/e2e.yaml

.github/workflows/e2e.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ jobs:
8888
EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar"
8989
if [[ ${{ inputs.flink-version }} == v2* ]]; then
9090
EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/2.0-preview1/flink-examples-streaming-2.0-preview1.jar"
91+
elif [[ "${{ inputs.test }}" == "test_batch_job.sh" ]]; then
92+
EXAMPLES_JAR="https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming/1.20.1/flink-examples-streaming-1.20.1.jar"
9193
fi
9294
ESCAPED_EXAMPLES_JAR=$(printf '%s\n' "$EXAMPLES_JAR" | sed -e 's/[\/&]/\\&/g')
9395
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
apiVersion: flink.apache.org/v1beta1
20+
kind: FlinkDeployment
21+
metadata:
22+
namespace: default
23+
name: flink-example-wordcount-batch
24+
spec:
25+
image: flink:1.20
26+
flinkVersion: v1_20
27+
ingress:
28+
template: "/{{namespace}}/{{name}}(/|$)(.*)"
29+
className: "nginx"
30+
annotations:
31+
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
32+
flinkConfiguration:
33+
taskmanager.numberOfTaskSlots: "2"
34+
kubernetes.operator.snapshot.resource.enabled: "false"
35+
serviceAccount: flink
36+
podTemplate:
37+
spec:
38+
initContainers:
39+
- name: artifacts-fetcher
40+
image: busybox:1.35.0
41+
imagePullPolicy: IfNotPresent
42+
# Use wget or other tools to get user jars from remote storage
43+
command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ]
44+
volumeMounts:
45+
- mountPath: /flink-artifact
46+
name: flink-artifact
47+
containers:
48+
# Do not change the main container name
49+
- name: flink-main-container
50+
resources:
51+
requests:
52+
ephemeral-storage: 2048Mi
53+
limits:
54+
ephemeral-storage: 2048Mi
55+
volumeMounts:
56+
- mountPath: /opt/flink/usrlib
57+
name: flink-artifact
58+
volumes:
59+
- name: flink-artifact
60+
emptyDir: { }
61+
jobManager:
62+
resource:
63+
memory: "1024m"
64+
cpu: 0.5
65+
taskManager:
66+
resource:
67+
memory: "1Gi"
68+
cpu: 0.5
69+
job:
70+
jarURI: local:///opt/flink/usrlib/myjob.jar
71+
entryClass: org.apache.flink.streaming.examples.wordcount.WordCount
72+
args: ["--execution-mode", "BATCH"]
73+
parallelism: 2
74+
upgradeMode: stateless
75+
mode: native
76+
77+
---
78+
apiVersion: networking.k8s.io/v1
79+
kind: IngressClass
80+
metadata:
81+
annotations:
82+
ingressclass.kubernetes.io/is-default-class: "true"
83+
labels:
84+
app.kubernetes.io/component: controller
85+
name: nginx
86+
spec:
87+
controller: k8s.io/ingress-nginx

e2e-tests/test_batch_job.sh

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#!/usr/bin/env bash
2+
################################################################################
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
################################################################################
19+
20+
# This script tests basic Flink batch job operations on Kubernetes:
21+
# 1. Deploys a FlinkDeployment for a batch job.
22+
# 2. Waits for the JobManager to become ready.
23+
# 3. Verifies that the job reaches the FINISHED state.
24+
# 4. Applies a no-op spec change and verifies the job remains in the FINISHED state.
25+
# 5. Checks the operator logs for the expected job state transition message.
26+
# 6. Checks the JobManager logs for successful application completion.
27+
# 7. Applies a spec change and verifies the job re-runs successfully.
28+
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
29+
source "${SCRIPT_DIR}/utils.sh"
30+
31+
CLUSTER_ID="flink-example-wordcount-batch"
32+
APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-batch-cr.yaml"
33+
APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID"
34+
TIMEOUT=300
35+
36+
on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID
37+
38+
retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
39+
40+
wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
41+
42+
# Wait for the job to reach the FINISHED state.
43+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1
44+
45+
# Apply a no-op spec change; verify the job remains in the FINISHED state.
46+
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"flinkConfiguration": {"kubernetes.operator.deployment.readiness.timeout": "6h" } } }'
47+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1
48+
49+
# Verify the job status change to FINISHED shows up in the operator logs.
50+
operator_pod_name=$(get_operator_pod_name)
51+
wait_for_operator_logs "$operator_pod_name" "Job status changed from .* to FINISHED" ${TIMEOUT} || exit 1
52+
53+
# Verify the job completed successfully in the job manager logs.
54+
jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
55+
wait_for_logs "$jm_pod_name" "Application completed SUCCESSFULLY" ${TIMEOUT} || exit 1
56+
57+
# Apply a spec change; verify the job re-runs and reaches the FINISHED state.
58+
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"parallelism": 1 } } }'
59+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RECONCILING $TIMEOUT || exit 1
60+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1
61+
62+
echo "Successfully ran the batch job test"

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
import java.util.Optional;
143143
import java.util.concurrent.Callable;
144144
import java.util.concurrent.CompletableFuture;
145+
import java.util.concurrent.ExecutionException;
145146
import java.util.concurrent.ExecutorService;
146147
import java.util.concurrent.TimeUnit;
147148
import java.util.concurrent.TimeoutException;
@@ -546,7 +547,7 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
546547
try {
547548
latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0;
548549
} catch (Exception e) {
549-
if (e instanceof RestClientException
550+
if (e instanceof ExecutionException
550551
&& e.getMessage() != null
551552
&& e.getMessage().contains("Checkpointing has not been enabled")) {
552553
LOG.warn("Checkpointing not enabled for job {}", jobId, e);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import java.util.Random;
9999
import java.util.Set;
100100
import java.util.concurrent.CompletableFuture;
101+
import java.util.concurrent.ExecutionException;
101102
import java.util.concurrent.TimeoutException;
102103
import java.util.function.Consumer;
103104
import java.util.stream.Collectors;
@@ -595,8 +596,9 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
595596
Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
596597
getCheckpointInfo(JobID jobId, Configuration conf) throws Exception {
597598
if (throwCheckpointingDisabledError) {
598-
throw new RestClientException(
599-
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST);
599+
throw new ExecutionException(
600+
new RestClientException(
601+
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST));
600602
}
601603

602604
if (checkpointInfo != null) {

0 commit comments

Comments
 (0)