Skip to content

Commit f34482d

Browse files
[FLINK-37370] [Observer] Fix exception caught when handling checkpointing not enabled for batch jobs and add batch e2e test
1 parent 48359a3 commit f34482d

File tree

5 files changed

+176
-3
lines changed

5 files changed

+176
-3
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ jobs:
161161
- test_autoscaler.sh
162162
- test_flink_operator_ha.sh
163163
- test_snapshot.sh
164+
- test_batch_job.sh
164165
exclude:
165166
- flink-version: v1_16
166167
test: test_autoscaler.sh
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
apiVersion: flink.apache.org/v1beta1
20+
kind: FlinkDeployment
21+
metadata:
22+
namespace: default
23+
name: flink-example-wordcount-batch
24+
spec:
25+
image: flink:1.20
26+
flinkVersion: v1_20
27+
ingress:
28+
template: "/{{namespace}}/{{name}}(/|$)(.*)"
29+
className: "nginx"
30+
annotations:
31+
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
32+
flinkConfiguration:
33+
taskmanager.numberOfTaskSlots: "2"
34+
high-availability.type: kubernetes
35+
high-availability.storageDir: file:///opt/flink/volume/flink-ha
36+
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
37+
state.savepoints.dir: file:///opt/flink/volume/flink-sp
38+
kubernetes.operator.snapshot.resource.enabled: "false"
39+
serviceAccount: flink
40+
podTemplate:
41+
spec:
42+
initContainers:
43+
- name: artifacts-fetcher
44+
image: busybox:1.35.0
45+
imagePullPolicy: IfNotPresent
46+
# Use wget or other tools to get user jars from remote storage
47+
command: [ 'wget', 'STREAMING_EXAMPLES_JAR_URL', '-O', '/flink-artifact/myjob.jar' ]
48+
volumeMounts:
49+
- mountPath: /flink-artifact
50+
name: flink-artifact
51+
containers:
52+
# Do not change the main container name
53+
- name: flink-main-container
54+
resources:
55+
requests:
56+
ephemeral-storage: 2048Mi
57+
limits:
58+
ephemeral-storage: 2048Mi
59+
volumeMounts:
60+
- mountPath: /opt/flink/usrlib
61+
name: flink-artifact
62+
- mountPath: /opt/flink/volume
63+
name: flink-volume
64+
volumes:
65+
- name: flink-artifact
66+
emptyDir: { }
67+
- name: flink-volume
68+
persistentVolumeClaim:
69+
claimName: flink-example-wordcount-batch
70+
jobManager:
71+
resource:
72+
memory: "1024m"
73+
cpu: 0.5
74+
taskManager:
75+
resource:
76+
memory: "1Gi"
77+
cpu: 0.5
78+
job:
79+
jarURI: local:///opt/flink/usrlib/myjob.jar
80+
entryClass: org.apache.flink.streaming.examples.wordcount.WordCount
81+
args: ["--execution-mode", "BATCH"]
82+
parallelism: 2
83+
upgradeMode: stateless
84+
mode: native
85+
86+
---
87+
apiVersion: v1
88+
kind: PersistentVolumeClaim
89+
metadata:
90+
name: flink-example-wordcount-batch
91+
spec:
92+
accessModes:
93+
- ReadWriteOnce
94+
volumeMode: Filesystem
95+
resources:
96+
requests:
97+
storage: 1Gi
98+
99+
---
100+
apiVersion: networking.k8s.io/v1
101+
kind: IngressClass
102+
metadata:
103+
annotations:
104+
ingressclass.kubernetes.io/is-default-class: "true"
105+
labels:
106+
app.kubernetes.io/component: controller
107+
name: nginx
108+
spec:
109+
controller: k8s.io/ingress-nginx

e2e-tests/test_batch_job.sh

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#!/usr/bin/env bash
2+
################################################################################
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
################################################################################
19+
20+
# This script tests basic Flink batch job operations on Kubernetes:
21+
# 1. Deploys a FlinkDeployment for a batch job.
22+
# 2. Waits for the JobManager to become ready.
23+
# 3. Verifies that the job reaches the FINISHED state (first check).
24+
# 4. Sleeps for a configurable duration to test for state persistence.
25+
# 5. Verifies that the job remains in the FINISHED state (second check).
26+
# 6. Checks the operator logs for the expected job state transition message.
27+
# 7. Checks the JobManager logs for successful application completion.
28+
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
29+
source "${SCRIPT_DIR}/utils.sh"
30+
31+
CLUSTER_ID="flink-example-wordcount-batch"
32+
APPLICATION_YAML="${SCRIPT_DIR}/data/flinkdep-batch-cr.yaml"
33+
APPLICATION_IDENTIFIER="flinkdep/$CLUSTER_ID"
34+
TIMEOUT=300
35+
SLEEP_DURATION=30
36+
37+
on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $CLUSTER_ID
38+
39+
retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
40+
41+
wait_for_jobmanager_running $CLUSTER_ID $TIMEOUT
42+
43+
# Wait for the job to reach the FINISHED state (first check).
44+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1
45+
46+
echo "Job reached FINISHED state. Sleeping for $SLEEP_DURATION seconds..."
47+
sleep "$SLEEP_DURATION"
48+
49+
# Verify the job is *still* in the FINISHED state (second check).
50+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1
51+
52+
# Verify the job status change to FINISHED shows up in the operator logs.
53+
operator_pod_name=$(get_operator_pod_name)
54+
wait_for_operator_logs "$operator_pod_name" "Job status changed from .* to FINISHED" ${TIMEOUT} || exit 1
55+
56+
# Verify the job completed successfully in the job manager logs.
57+
jm_pod_name=$(get_jm_pod_name $CLUSTER_ID)
58+
wait_for_logs "$jm_pod_name" "Application completed SUCCESSFULLY" ${TIMEOUT} || exit 1
59+
60+
echo "Successfully ran the batch job test"

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
import java.util.Optional;
143143
import java.util.concurrent.Callable;
144144
import java.util.concurrent.CompletableFuture;
145+
import java.util.concurrent.ExecutionException;
145146
import java.util.concurrent.ExecutorService;
146147
import java.util.concurrent.TimeUnit;
147148
import java.util.concurrent.TimeoutException;
@@ -546,7 +547,7 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
546547
try {
547548
latestCheckpointOpt = getCheckpointInfo(jobId, conf).f0;
548549
} catch (Exception e) {
549-
if (e instanceof RestClientException
550+
if (e instanceof ExecutionException
550551
&& e.getMessage() != null
551552
&& e.getMessage().contains("Checkpointing has not been enabled")) {
552553
LOG.warn("Checkpointing not enabled for job {}", jobId, e);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import java.util.Random;
9999
import java.util.Set;
100100
import java.util.concurrent.CompletableFuture;
101+
import java.util.concurrent.ExecutionException;
101102
import java.util.concurrent.TimeoutException;
102103
import java.util.function.Consumer;
103104
import java.util.stream.Collectors;
@@ -595,8 +596,9 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
595596
Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>>
596597
getCheckpointInfo(JobID jobId, Configuration conf) throws Exception {
597598
if (throwCheckpointingDisabledError) {
598-
throw new RestClientException(
599-
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST);
599+
throw new ExecutionException(
600+
new RestClientException(
601+
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST));
600602
}
601603

602604
if (checkpointInfo != null) {

0 commit comments

Comments
 (0)